/* Copyright (C) 2014 InfiniDB, Inc.

   This program is free software; you can redistribute it and/or
   modify it under the terms of the GNU General Public License
   as published by the Free Software Foundation; version 2 of
   the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
   MA 02110-1301, USA. */

// $Id: impl.cpp 3495 2013-01-21 14:09:51Z rdempsey $

/* This code is based on udpcast-20090830. Most of the source code in that release
   contains no copyright or licensing notices at all. The exception is fec.c, which
   is not used here. The udpcast website, http://udpcast.linux.lu/, implies that
   the source is covered under GPL. */

#include <net/if.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/ioctl.h>
#include <cerrno>
#include <pthread.h>
using namespace std;

#include "bytestream.h"
using namespace messageqcpp;

#include "udp-sender.h"
#include "udpc-protoc.h"
#include "util.h"
#include "mc_fifo.h"

#include "impl.h"

struct participantsDb
{
  int nrParticipants;

  struct clientDesc
  {
    struct sockaddr_in addr;
    int used;
    int capabilities;
    unsigned int rcvbuf;
  } clientTable[MAX_CLIENTS];
};
struct produconsum
{
  unsigned int size;
  volatile unsigned int produced;
  unsigned int consumed;
  volatile int atEnd;
  pthread_mutex_t mutex;
  volatile int consumerIsWaiting;
  pthread_cond_t cond;
  const char* name;
};

struct stats
{
  int fd;
  struct timeval lastPrinted;
  long statPeriod;
  int printUncompressedPos;
};

struct sender_stats
{
  FILE* log;
  unsigned long long totalBytes;
  unsigned long long retransmissions;
  int clNo;
  unsigned long periodBytes;
  struct timeval periodStart;
  long bwPeriod;
  struct stats s;
};

struct receiver_stats
{
  struct timeval tv_start;
  int bytesOrig;
  long long totalBytes;
  int timerStarted;
  struct stats s;
};

#define SLICEMAGIC 0x41424344

typedef struct slice
{
  int base; /* base address of slice in buffer */
  int sliceNo;
  int bytes;     /* bytes in slice */
  int nextBlock; /* index of next buffer to be transmitted */
  enum slice_state
  {
    SLICE_FREE,     /* free slice, and in the queue of free slices */
    SLICE_NEW,      /* newly allocated. FEC calculation and first
                     * transmission */
    SLICE_XMITTED,  /* transmitted */
    SLICE_ACKED,    /* acknowledged (if applicable) */
    SLICE_PRE_FREE, /* no longer used, but not returned to queue */
    SLICE_RECEIVING,
    SLICE_DONE,
  };
  volatile enum slice_state state;
  char rxmitMap[MAX_SLICE_SIZE / BITS_PER_CHAR];
  /* blocks to be retransmitted */

  char isXmittedMap[MAX_SLICE_SIZE / BITS_PER_CHAR];
  /* blocks which have already been retransmitted during this round*/

  int rxmitId; /* used to distinguish among several retransmission
                * requests, so that we can easily discard answers to "old"
                * requests */

  /* This structure is used to keep track of clients who answered, and
   * to make the reqack message
   */
  struct reqackBm
  {
    struct reqack ra;
    char readySet[MAX_CLIENTS / BITS_PER_CHAR]; /* who is already ok? */
  } sl_reqack;

  char answeredSet[MAX_CLIENTS / BITS_PER_CHAR]; /* who answered at all? */

  int nrReady;       /* number of participants who are ready */
  int nrAnswered;    /* number of participants who answered; */
  int needRxmit;     /* does this need retransmission? */
  int lastGoodBlock; /* last good block of slice (i.e. last block having not
                      * needed retransmission */

  int lastReqack; /* last req ack sent (debug) */
#ifdef BB_FEATURE_UDPCAST_FEC
  unsigned char* fec_data;
#endif
  int magic;
  int blocksTransferred;     /* blocks transferred during this slice */
  int dataBlocksTransferred; /* data blocks transferred during this slice */
  struct retransmit retransmit;
  int freePos;    /* where the next data part will be stored to */
  int bytesKnown; /* is number of bytes known yet? */
  short missing_data_blocks[MAX_FEC_INTERLEAVE];
} * slice_t;

#define QUEUE_SIZE 256

struct returnChannel
{
  pthread_t thread;        /* message receiving thread */
  int rcvSock;             /* socket on which we receive the messages */
  produconsum_t incoming;  /* where to enqueue incoming messages */
  produconsum_t freeSpace; /* free space */
  struct
  {
    int clNo;          /* client number */
    union message msg; /* its message */
  } q[QUEUE_SIZE];
  struct net_config* config;
  participantsDb_t participantsDb;
};
#define NR_SLICES 2

typedef struct senderState
{
  struct returnChannel rc;
  struct fifo* fifo;

  struct net_config* config;
  sender_stats_t stats;
  int socket;

  struct slice slices[NR_SLICES];

  produconsum_t free_slices_pc;

  unsigned char* fec_data;
  pthread_t fec_thread;
  produconsum_t fec_data_pc;
} * sender_state_t;

struct clientState
{
  struct fifo* fifo;
  struct client_config* client_config;
  struct net_config* net_config;
  union serverDataMsg Msg;

  struct msghdr data_hdr;

  /* pre-prepared messages */
  struct iovec data_iov[2];

  struct slice* currentSlice;
  int currentSliceNo;
  receiver_stats_t stats;

  produconsum_t free_slices_pc;
  struct slice slices[NR_SLICES];

  /* Completely received slices */
  int receivedPtr;
  int receivedSliceNo;

#ifdef BB_FEATURE_UDPCAST_FEC
  int use_fec; /* do we use forward error correction ? */
#endif
  produconsum_t fec_data_pc;
  struct slice* fec_slices[NR_SLICES];
  pthread_t fec_thread;

  /* A reservoir of free blocks for FEC */
  produconsum_t freeBlocks_pc;
  unsigned char** blockAddresses; /* adresses of blocks in local queue */

  unsigned char** localBlockAddresses;
  /* local blocks: freed FEC blocks after we
   * have received the corresponding data */
  int localPos;

  unsigned char* blockData;
  unsigned char* nextBlock;

  int endReached; /* end of transmission reached:
                     0: transmission in progress
                     2: network transmission _and_ FEC
                        processing finished
                  */

  int netEndReached; /* In case of a FEC transmission; network
                      * transmission finished. This is needed to avoid
                      * a race condition, where the receiver thread would
                      * already prepare to wait for more data, at the same
                      * time that the FEC would set endReached. To avoid
                      * this, we do a select without timeout before
                      * receiving the last few packets, so that if the
                      * race condition strikes, we have a way to protect
                      * against
                      */

  int selectedFd;

  int promptPrinted; /* Has "Press any key..." prompt already been printed */

#ifdef BB_FEATURE_UDPCAST_FEC
  fec_code_t fec_code;
#endif
};

#define S_UCAST socks[0]
#define S_BCAST socks[1]
#define S_MCAST_CTRL socks[2]
#define S_MCAST_DATA socks[3]

#define SSEND(x) SEND(client_config->S_UCAST, x, client_config->serverAddr)

/**
 * Receiver will passively listen to sender. Works best if sender runs
 * in async mode
 */
#define FLAG_PASSIVE 0x0010

/**
 * Do not write file synchronously
 */
#define FLAG_NOSYNC 0x0040

/*
 * Don't ask for keyboard input on receiver end.
 */
#define FLAG_NOKBD 0x0080

/**
 * Do write file synchronously
 */
#define FLAG_SYNC 0x0100

namespace
{
int udpc_isFullDuplex(int s, const char* ifname)
{
#ifdef ETHTOOL_GLINK
  struct ifreq ifr;
  struct ethtool_cmd ecmd;

  ecmd.cmd = ETHTOOL_GSET;

  strncpy(ifr.ifr_name, ifname, sizeof(ifr.ifr_name) - 1);
  ifr.ifr_data = (char*)&ecmd;

  if (ioctl(s, SIOCETHTOOL, &ifr) == -1)
  {
    /* Operation not supported */
    return -1;
  }
  else
  {
    return ecmd.duplex;
  }

#else
  return -1;
#endif
}

#define getSinAddr(addr) (((struct sockaddr_in*)addr)->sin_addr)

int udpc_ipIsZero(struct sockaddr_in* ip)
{
  return getSinAddr(ip).s_addr == 0;
}

int hasLink(int s, const char* ifname)
{
#ifdef ETHTOOL_GLINK
  struct ifreq ifr;
  struct ethtool_value edata;

  edata.cmd = ETHTOOL_GLINK;

  strncpy(ifr.ifr_name, ifname, sizeof(ifr.ifr_name) - 1);
  ifr.ifr_data = (char*)&edata;

  if (ioctl(s, SIOCETHTOOL, &ifr) == -1)
  {
    /* Operation not supported */
    return -1;
  }
  else
  {
    return edata.data;
  }

#else
  return -1;
#endif
}

#define INET_ATON(a, i) inet_aton(a, i)

int udpc_doSend(int s, void* message, size_t len, struct sockaddr_in* to)
{
  /*    flprintf("sent: %08x %d\n", *(int*) message, len);*/
#ifdef LOSSTEST
  loseSendPacket();
#endif
  return sendto(s, message, len, 0, (struct sockaddr*)to, sizeof(*to));
}

void udpc_copyToMessage(unsigned char* dst, struct sockaddr_in* src)
{
  memcpy(dst, (char*)&((struct sockaddr_in*)src)->sin_addr, sizeof(struct in_addr));
}

void udpc_sendHello(struct net_config* net_config, int sock, int streaming)
{
  // cerr << "sending hello..." << endl;
  struct hello hello;

  /* send hello message */
  if (streaming)
    hello.opCode = htons(CMD_HELLO_STREAMING);
  else
    hello.opCode = htons(CMD_HELLO);

  hello.reserved = 0;
  hello.capabilities = htonl(net_config->capabilities);
  udpc_copyToMessage(hello.mcastAddr, &net_config->dataMcastAddr);
  hello.blockSize = htons(net_config->blockSize);
  // TODO: FIXME
  // rgWaitAll(net_config, sock, net_config->controlMcastAddr.sin_addr.s_addr, sizeof(hello));
  BCAST_CONTROL(sock, hello);
}

char* udpc_getIpString(struct sockaddr_in* addr, char* buffer)
{
  long iaddr = htonl(getSinAddr(addr).s_addr);
  sprintf(buffer, "%ld.%ld.%ld.%ld", (iaddr >> 24) & 0xff, (iaddr >> 16) & 0xff, (iaddr >> 8) & 0xff,
          iaddr & 0xff);
  return buffer;
}

net_if_t* udpc_getNetIf(const char* wanted)
{
#ifndef __MINGW32__
  struct ifreq ibuf[100];
  struct ifreq *ifrp, *ifend, *chosen;
  struct ifconf ifc;
  int s;
#else  /* __MINGW32__ */
  int i;

  int etherNo = -1;
  int wantedEtherNo = -2; /* Wanted ethernet interface */

  MIB_IPADDRTABLE* iptab = NULL;
  MIB_IFTABLE* iftab = NULL;

  MIB_IPADDRROW *iprow, *chosen = NULL;
  MIB_IFROW* chosenIf = NULL;
  WORD wVersionRequested; /* Version of Winsock to load */
  WSADATA wsaData;        /* Winsock implementation details */
  ULONG a;

  int r;
#endif /* __MINGW32__ */

  int lastGoodness = 0;
  struct in_addr wantedAddress;
  int isAddress = 0;
  int wantedLen = 0;
  net_if_t* net_if;

  if (wanted == NULL)
  {
    wanted = getenv("IFNAME");
  }

  if (wanted && INET_ATON(wanted, &wantedAddress))
    isAddress = 1;
  else
    wantedAddress.s_addr = 0;

  if (wanted)
    wantedLen = strlen(wanted);

  net_if = MALLOC(net_if_t);
  // TODO: FIXME
  // if(net_if == NULL)
  //    udpc_fatal(1, "Out of memory error");

#ifndef __MINGW32__

  s = socket(PF_INET, SOCK_DGRAM, 0);

  if (s < 0)
  {
    perror("make socket");
    exit(1);
  }

  ifc.ifc_len = sizeof(ibuf);
  ifc.ifc_buf = (caddr_t)ibuf;

  if (ioctl(s, SIOCGIFCONF, (char*)&ifc) < 0 || ifc.ifc_len < (signed int)sizeof(struct ifreq))
  {
    perror("udpcast: SIOCGIFCONF: ");
    exit(1);
  }

  ifend = (struct ifreq*)((char*)ibuf + ifc.ifc_len);
  chosen = NULL;

  for (ifrp = ibuf; ifrp < ifend;
#ifdef IFREQ_SIZE
       ifrp = IFREQ_SIZE(*ifrp) + (char*)ifrp
#else
       ifrp++
#endif
  )
  {
    unsigned long iaddr = getSinAddr(&ifrp->ifr_addr).s_addr;
    int goodness;

    if (ifrp->ifr_addr.sa_family != PF_INET)
      continue;

    if (wanted)
    {
      if (isAddress && iaddr == wantedAddress.s_addr)
      {
        goodness = 8;
      }
      else if (strcmp(wanted, ifrp->ifr_name) == 0)
      {
        /* perfect match on interface name */
        goodness = 12;
      }
      else if (wanted != NULL && strncmp(wanted, ifrp->ifr_name, wantedLen) == 0)
      {
        /* prefix match on interface name */
        goodness = 7;
      }
      else
      {
        /* no match, try next */
        continue;
      }
    }
    else
    {
      if (iaddr == 0)
      {
        /* disregard interfaces whose address is zero */
        goodness = 1;
      }
      else if (iaddr == htonl(0x7f000001))
      {
        /* disregard localhost type devices */
        goodness = 2;
      }
      else if (strcmp("eth0", ifrp->ifr_name) == 0 || strcmp("en0", ifrp->ifr_name) == 0)
      {
        /* prefer first ethernet interface */
        goodness = 6;
      }
      else if (strncmp("eth0:", ifrp->ifr_name, 5) == 0)
      {
        /* second choice: any secondary addresses of first ethernet */
        goodness = 5;
      }
      else if (strncmp("eth", ifrp->ifr_name, 3) == 0 || strncmp("en", ifrp->ifr_name, 2) == 0)
      {
        /* and, if not available, any other ethernet device */
        goodness = 4;
      }
      else
      {
        goodness = 3;
      }
    }

    if (hasLink(s, ifrp->ifr_name))
      /* Good or unknown link status privileged over known
       * disconnected */
      goodness += 3;

    /* If all else is the same, prefer interfaces that
     * have broadcast */
    goodness = goodness * 2;

    if (goodness >= lastGoodness)
    {
      /* Privilege broadcast-enabled interfaces */
      if (ioctl(s, SIOCGIFBRDADDR, ifrp) < 0)
      {
        // TODO: FIXME
        // udpc_fatal(-1, "Error getting broadcast address for %s: %s", ifrp->ifr_name, strerror(errno));
      }

      if (getSinAddr(&ifrp->ifr_ifru.ifru_broadaddr).s_addr)
        goodness++;
    }

    if (goodness > lastGoodness)
    {
      chosen = ifrp;
      lastGoodness = goodness;
      net_if->addr.s_addr = iaddr;
    }
  }

  if (!chosen)
  {
    fprintf(stderr, "No suitable network interface found\n");
    fprintf(stderr, "The following interfaces are available:\n");

    for (ifrp = ibuf; ifrp < ifend;
#ifdef IFREQ_SIZE
         ifrp = IFREQ_SIZE(*ifrp) + (char*)ifrp
#else
         ifrp++
#endif
    )
    {
      char buffer[16];

      if (ifrp->ifr_addr.sa_family != PF_INET)
        continue;

      fprintf(stderr, "\t%s\t%s\n", ifrp->ifr_name,
              udpc_getIpString((struct sockaddr_in*)&ifrp->ifr_addr, buffer));
    }

    exit(1);
  }

  net_if->name = strdup(chosen->ifr_name);

#ifdef HAVE_STRUCT_IP_MREQN_IMR_IFINDEX

  /* Index for multicast subscriptions */
  if (ioctl(s, SIOCGIFINDEX, chosen) < 0)
  {
    // TODO: FIXME
    // udpc_fatal(-1, "Error getting index for %s: %s", net_if->name, strerror(errno));
  }

  net_if->index = chosen->ifr_ifindex;
#endif

  /* Broadcast */
  if (ioctl(s, SIOCGIFBRDADDR, chosen) < 0)
  {
    // TODO: FIXME
    // udpc_fatal(-1, "Error getting broadcast address for %s: %s", net_if->name, strerror(errno));
  }

  net_if->bcast = getSinAddr(&chosen->ifr_ifru.ifru_broadaddr);

  close(s);

#else  /* __MINGW32__ */

  /* WINSOCK initialization */
  wVersionRequested = MAKEWORD(2, 0); /* Request Winsock v2.0 */

  if (WSAStartup(wVersionRequested, &wsaData) != 0) /* Load Winsock DLL */
  {
    fprintf(stderr, "WSAStartup() failed");
    exit(1);
  }

  /* End WINSOCK initialization */

  a = 0;
  r = GetIpAddrTable(iptab, &a, TRUE);
  iptab = malloc(a);
  r = GetIpAddrTable(iptab, &a, TRUE);

  a = 0;
  r = GetIfTable(iftab, &a, TRUE);
  iftab = malloc(a);
  r = GetIfTable(iftab, &a, TRUE);

  if (wanted && !strncmp(wanted, "eth", 3) && wanted[3])
  {
    char* ptr;
    int n = strtoul(wanted + 3, &ptr, 10);

    if (!*ptr)
      wantedEtherNo = n;
  }

  for (i = 0; i < iptab->dwNumEntries; i++)
  {
    int goodness = -1;
    unsigned long iaddr;
    int isEther = 0;
    MIB_IFROW* ifrow;

    iprow = &iptab->table[i];
    iaddr = iprow->dwAddr;

    ifrow = getIfRow(iftab, iprow->dwIndex);

    if (ifrow && ifrow->dwPhysAddrLen == 6 && iprow->dwBCastAddr)
    {
      isEther = 1;
      etherNo++;
    }

    if (wanted)
    {
      if (isAddress && iaddr == wantedAddress.s_addr)
      {
        goodness = 8;
      }
      else if (isEther && wantedEtherNo == etherNo)
      {
        goodness = 9;
      }
      else if (ifrow->dwPhysAddrLen)
      {
        int j;
        const char* ptr = wanted;

        for (j = 0; *ptr && j < ifrow->dwPhysAddrLen; j++)
        {
          int digit = strtoul(ptr, (char**)&ptr, 16);

          if (digit != ifrow->bPhysAddr[j])
            break; /* Digit mismatch */

          if (*ptr == '-' || *ptr == ':')
          {
            ptr++;
          }
        }

        if (!*ptr && j == ifrow->dwPhysAddrLen)
        {
          goodness = 9;
        }
      }
    }
    else
    {
      if (iaddr == 0)
      {
        /* disregard interfaces whose address is zero */
        goodness = 1;
      }
      else if (iaddr == htonl(0x7f000001))
      {
        /* disregard localhost type devices */
        goodness = 2;
      }
      else if (isEther)
      {
        /* prefer ethernet */
        goodness = 6;
      }
      else if (ifrow->dwPhysAddrLen)
      {
        /* then prefer interfaces which have a physical address */
        goodness = 4;
      }
      else
      {
        goodness = 3;
      }
    }

    goodness = goodness * 2;

    /* If all else is the same, prefer interfaces that
     * have broadcast */
    if (goodness >= lastGoodness)
    {
      /* Privilege broadcast-enabled interfaces */
      if (iprow->dwBCastAddr)
        goodness++;
    }

    if (goodness > lastGoodness)
    {
      chosen = iprow;
      chosenIf = ifrow;
      lastGoodness = goodness;
    }
  }

  if (!chosen)
  {
    fprintf(stderr, "No suitable network interface found%s%s\n", wanted ? " for " : "", wanted ? wanted : "");
    fprintf(stderr, "The following interfaces are available:\n");

    for (i = 0; i < iptab->dwNumEntries; i++)
    {
      char buffer[16];
      struct sockaddr_in addr;
      MIB_IFROW* ifrow;
      char* name = NULL;
      iprow = &iptab->table[i];
      addr.sin_addr.s_addr = iprow->dwAddr;
      ifrow = getIfRow(iftab, iprow->dwIndex);
      name = fmtName(ifrow);
      fprintf(stderr, " %15s  %s\n", udpc_getIpString(&addr, buffer), name ? name : "");

      if (name)
        free(name);
    }

    exit(1);
  }

  net_if->bcast.s_addr = net_if->addr.s_addr = chosen->dwAddr;

  if (chosen->dwBCastAddr)
    net_if->bcast.s_addr |= ~chosen->dwMask;

  if (chosenIf)
  {
    net_if->name = fmtName(chosenIf);
  }
  else
  {
    net_if->name = "*";
  }

  free(iftab);
  free(iptab);
#endif /* __MINGW32__ */

  return net_if;
}

#define IP_MREQN ip_mreqn

int fillMreq(net_if_t* net_if, struct in_addr addr, struct IP_MREQN* mreq)
{
#ifdef HAVE_STRUCT_IP_MREQN_IMR_IFINDEX
  mreq->imr_ifindex = net_if->index;
  mreq->imr_address.s_addr = 0;
#else
  mreq->imr_interface = net_if->addr;
#endif
  mreq->imr_multiaddr = addr;

  return 0;
}

int mcastOp(int sock, net_if_t* net_if, struct in_addr addr, int code, const char* message)
{
  struct IP_MREQN mreq;
  int r;

  fillMreq(net_if, addr, &mreq);
  r = setsockopt(sock, SOL_IP, code, (char*)&mreq, sizeof(mreq));

  if (r < 0)
  {
    perror(message);
    exit(1);
  }

  return 0;
}

int udpc_setMcastDestination(int sock, net_if_t* net_if, struct sockaddr_in* addr)
{
#ifdef WINDOWS
  int r;
  struct sockaddr_in interface_addr;
  struct in_addr if_addr;
  getMyAddress(net_if, &interface_addr);
  if_addr = getSinAddr(&interface_addr);
  r = setsockopt(sock, IPPROTO_IP, IP_MULTICAST_IF, (char*)&if_addr, sizeof(if_addr));

  if (r < 0)
    fatal(1, "Set multicast send interface");

  return 0;
#else
  /* IP_MULTICAST_IF not correctly supported on Cygwin */
  return mcastOp(sock, net_if, getSinAddr(addr), IP_MULTICAST_IF, "Set multicast send interface");
#endif
}

int initSockAddress(addr_type_t addr_type, net_if_t* net_if, in_addr_t ip, unsigned short port,
                    struct sockaddr_in* addr)
{
  memset((char*)addr, 0, sizeof(struct sockaddr_in));
  addr->sin_family = AF_INET;
  addr->sin_port = htons(port);

  // TODO: FIXME
  // if(!net_if && addr_type != ADDR_TYPE_MCAST)
  // udpc_fatal(1, "initSockAddr without ifname\n");

  switch (addr_type)
  {
    case ADDR_TYPE_UCAST: addr->sin_addr = net_if->addr; break;

    case ADDR_TYPE_BCAST: addr->sin_addr = net_if->bcast; break;

    case ADDR_TYPE_MCAST: addr->sin_addr.s_addr = ip; break;
  }

  return 0;
}

int mcastListen(int sock, net_if_t* net_if, struct sockaddr_in* addr)
{
  return mcastOp(sock, net_if, getSinAddr(addr), IP_ADD_MEMBERSHIP, "Subscribe to multicast group");
}

int udpc_makeSocket(addr_type_t addr_type, net_if_t* net_if, struct sockaddr_in* tmpl, int port)
{
  int ret, s;
  struct sockaddr_in myaddr;
  in_addr_t ip = 0;

#ifdef WINDOWS
  static int lastSocket = -1;
  /* Very ugly hack, but hey!, this is for Windows */

  if (addr_type == ADDR_TYPE_MCAST)
  {
    mcastListen(lastSocket, net_if, tmpl);
    return -1;
  }
  else if (addr_type != ADDR_TYPE_UCAST)
    return -1;

#endif

  s = socket(PF_INET, SOCK_DGRAM, 0);

  if (s < 0)
  {
    perror("make socket");
    exit(1);
  }

  if (addr_type == ADDR_TYPE_MCAST && tmpl != NULL)
  {
    ip = tmpl->sin_addr.s_addr;
  }

  ret = initSockAddress(addr_type, net_if, ip, port, &myaddr);

  // TODO: FIXME
  // if(ret < 0)
  //	udpc_fatal(1, "Could not get socket address fot %d/%s",
  //		   addr_type, net_if->name);
  if (addr_type == ADDR_TYPE_BCAST)
  {
    // cerr << "for addr_type == ADDR_TYPE_BCAST, myaddr.sin_addr.s_addr = 0x" << hex <<
    // myaddr.sin_addr.s_addr << dec << endl;
  }

  if (addr_type == ADDR_TYPE_BCAST && myaddr.sin_addr.s_addr == 0)
  {
    /* Attempting to bind to broadcast address on not-broadcast media ... */
    closesocket(s);
    return -1;
  }

  ret = bind(s, (struct sockaddr*)&myaddr, sizeof(myaddr));
  // TODO: FIXME
  //    if (ret < 0) {
  //	char buffer[16];
  //	udpc_fatal(1, "bind socket to %s:%d (%s)\n",
  //		   udpc_getIpString(&myaddr, buffer),
  //		   udpc_getPort(&myaddr),
  //		   strerror(errno));
  //    }

  if (addr_type == ADDR_TYPE_MCAST)
    mcastListen(s, net_if, &myaddr);

#ifdef WINDOWS
  lastSocket = s;
#endif
  return s;
}

int udpc_setSocketToBroadcast(int sock)
{
  /* set the socket to broadcast */
  int p = 1;
  return setsockopt(sock, SOL_SOCKET, SO_BROADCAST, (char*)&p, sizeof(int));
}

int udpc_getBroadCastAddress(net_if_t* net_if, struct sockaddr_in* addr, short port)
{
  int r = initSockAddress(ADDR_TYPE_BCAST, net_if, INADDR_ANY, port, addr);

  if (addr->sin_addr.s_addr == 0)
  {
    /* Quick hack to make it work for loopback */
    struct sockaddr_in ucast;
    initSockAddress(ADDR_TYPE_UCAST, net_if, INADDR_ANY, port, &ucast);

    if ((ntohl(ucast.sin_addr.s_addr) & 0xff000000) == 0x7f000000)
      addr->sin_addr.s_addr = ucast.sin_addr.s_addr;
  }

  return r;
}

int safe_inet_aton(const char* address, struct in_addr* ip)
{
  if (!INET_ATON(address, ip))
  {
    // TODO: FIXME
    // udpc_fatal(-1, "Bad address %s", address);
  }

  return 0;
}

int udpc_getMcastAllAddress(struct sockaddr_in* addr, const char* address, short port)
{
  struct in_addr ip;
  int ret;

  if (address == NULL || address[0] == '\0')
    safe_inet_aton("224.0.0.1", &ip);
  else
  {
    if ((ret = safe_inet_aton(address, &ip)) < 0)
      return ret;
  }

  return initSockAddress(ADDR_TYPE_MCAST, NULL, ip.s_addr, port, addr);
}

void setPort(struct sockaddr_in* addr, unsigned short port)
{
  ((struct sockaddr_in*)addr)->sin_port = htons(port);
}

int isMcastAddress(struct sockaddr_in* addr)
{
  int ip = ntohl(addr->sin_addr.s_addr) >> 24;
  return ip >= 0xe0 && ip < 0xf0;
}

void udpc_clearIp(struct sockaddr_in* addr)
{
  addr->sin_addr.s_addr = 0;
  addr->sin_family = AF_INET;
}

void udpc_setSendBuf(int sock, unsigned int bufsize)
{
  if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char*)&bufsize, sizeof(bufsize)) < 0)
    perror("Set send buffer");
}

int udpc_setTtl(int sock, int ttl)
{
  /* set the socket to broadcast */
  return setsockopt(sock, SOL_IP, IP_MULTICAST_TTL, (char*)&ttl, sizeof(int));
}

int udpc_getMyAddress(net_if_t* net_if, struct sockaddr_in* addr)
{
  return initSockAddress(ADDR_TYPE_UCAST, net_if, INADDR_ANY, 0, addr);
}

void udpc_getDefaultMcastAddress(net_if_t* net_if, struct sockaddr_in* mcast)
{
  udpc_getMyAddress(net_if, mcast);
  mcast->sin_addr.s_addr &= htonl(0x07ffffff);
  mcast->sin_addr.s_addr |= htonl(0xe8000000);
}

void udpc_copyIpFrom(struct sockaddr_in* dst, struct sockaddr_in* src)
{
  dst->sin_addr = src->sin_addr;
  dst->sin_family = src->sin_family;
}

int udpc_getSelectedSock(int* socks, int nr, fd_set* read_set)
{
  int i;
  int maxFd;
  maxFd = -1;

  for (i = 0; i < nr; i++)
  {
    if (socks[i] == -1)
      continue;

    if (FD_ISSET(socks[i], read_set))
      return socks[i];
  }

  return -1;
}

int udpc_ipIsEqual(struct sockaddr_in* left, struct sockaddr_in* right)
{
  return getSinAddr(left).s_addr == getSinAddr(right).s_addr;
}

void udpc_closeSock(int* socks, int nr, int target)
{
  int i;
  int sock = socks[target];

  socks[target] = -1;

  for (i = 0; i < nr; i++)
    if (socks[i] == sock)
      return;

  closesocket(sock);
}
int prepareForSelect(int* socks, int nr, fd_set* read_set)
{
  int i;
  int maxFd;
  FD_ZERO(read_set);
  maxFd = -1;

  for (i = 0; i < nr; i++)
  {
    if (socks[i] == -1)
      continue;

    FD_SET(socks[i], read_set);

    if (socks[i] > maxFd)
      maxFd = socks[i];
  }

  return maxFd;
}

int doReceive(int s, void* message, size_t len, struct sockaddr_in* from, int portBase)
{
  socklen_t slen;
  int r;
  unsigned short port;

  slen = sizeof(*from);
#ifdef LOSSTEST
  loseRecvPacket(s);
#endif
  r = recvfrom(s, message, len, 0, (struct sockaddr*)from, &slen);

  if (r < 0)
    return r;

  port = ntohs(from->sin_port);

  if (port != RECEIVER_PORT(portBase) && port != SENDER_PORT(portBase))
  {
    return -1;
  }

  /*    flprintf("recv: %08x %d\n", *(int*) message, r);*/
  return r;
}

participantsDb_t udpc_makeParticipantsDb(void)
{
  return MALLOC(struct participantsDb);
}

int udpc_removeParticipant(struct participantsDb* db, int i)
{
  if (db->clientTable[i].used)
  {
    db->clientTable[i].used = 0;
    db->nrParticipants--;
  }

  return 0;
}

int udpc_lookupParticipant(struct participantsDb* db, struct sockaddr_in* addr)
{
  int i;

  for (i = 0; i < MAX_CLIENTS; i++)
  {
    if (db->clientTable[i].used && udpc_ipIsEqual(&db->clientTable[i].addr, addr))
    {
      return i;
    }
  }

  return -1;
}

int udpc_nrParticipants(participantsDb_t db)
{
  return db->nrParticipants;
}

int udpc_addParticipant(participantsDb_t db, struct sockaddr_in* addr, int capabilities, unsigned int rcvbuf,
                        int pointopoint)
{
  // cerr << "adding a participant..." << endl;
  int i;

  if ((i = udpc_lookupParticipant(db, addr)) >= 0)
    return i;

  for (i = 0; i < MAX_CLIENTS; i++)
  {
    if (!db->clientTable[i].used)
    {
      db->clientTable[i].addr = *addr;
      db->clientTable[i].used = 1;
      db->clientTable[i].capabilities = capabilities;
      db->clientTable[i].rcvbuf = rcvbuf;
      db->nrParticipants++;

      return i;
    }
    else if (pointopoint)
      return -1;
  }

  return -1; /* no space left in participant's table */
}

int selectWithoutConsole(int maxFd, fd_set* read_set, struct timeval* tv)
{
  int ret;

  ret = select(maxFd, read_set, NULL, NULL, tv);

  if (ret < 0)
    return -1;

  return ret;
}

#if 0
void sendHello(struct net_config* net_config, int sock,
               int streaming)
{
    struct hello hello;

    /* send hello message */
    if (streaming)
        hello.opCode = htons(CMD_HELLO_STREAMING);
    else
        hello.opCode = htons(CMD_HELLO);

    hello.reserved = 0;
    hello.capabilities = htonl(net_config->capabilities);
    udpc_copyToMessage(hello.mcastAddr, &net_config->dataMcastAddr);
    hello.blockSize = htons(net_config->blockSize);
    //rgWaitAll(net_config, sock, net_config->controlMcastAddr.sin_addr.s_addr, sizeof(hello));
    BCAST_CONTROL(sock, hello);
}
#endif

int checkClientWait(participantsDb_t db, struct net_config* net_config, time_t* firstConnected)
{
  time_t now;

  if (!udpc_nrParticipants(db) || !firstConnected || !*firstConnected)
    return 0; /* do not start: no receivers */

  now = time(0);

  /*
   * If we have a max_client_wait, start the transfer after first client
   * connected + maxSendWait
   */
  if (net_config->max_receivers_wait && (now >= *firstConnected + net_config->max_receivers_wait))
  {
#ifdef USE_SYSLOG
    syslog(LOG_INFO, "max wait[%d] passed: starting", net_config->max_receivers_wait);
#endif
    return 1; /* send-wait passed: start */
  }

  /*
   * Otherwise check to see if the minimum of clients
   *  have checked in.
   */
  else if (udpc_nrParticipants(db) >= net_config->min_receivers &&
           /*
            *  If there are enough clients and there's a min wait time, we'll
            *  wait around anyway until then.
            *  Otherwise, we always transfer
            */
           (!net_config->min_receivers_wait || now >= *firstConnected + net_config->min_receivers_wait))
  {
#ifdef USE_SYSLOG
    syslog(LOG_INFO, "min receivers[%d] reached: starting", net_config->min_receivers);
#endif
    return 1;
  }
  else
    return 0;
}

int sendConnectionReply(participantsDb_t db, int sock, struct net_config* config, struct sockaddr_in* client,
                        int capabilities, unsigned int rcvbuf)
{
  struct connectReply reply;

  if (rcvbuf == 0)
    rcvbuf = 65536;

  if (capabilities & CAP_BIG_ENDIAN)
  {
    reply.opCode = htons(CMD_CONNECT_REPLY);
    reply.clNr =
        htonl(udpc_addParticipant(db, client, capabilities, rcvbuf, config->flags & FLAG_POINTOPOINT));
    reply.blockSize = htonl(config->blockSize);
  }
  else
  {
    // TODO: FIXME
    // udpc_fatal(1, "Little endian protocol no longer supported");
  }

  reply.reserved = 0;

  if (config->flags & FLAG_POINTOPOINT)
  {
    udpc_copyIpFrom(&config->dataMcastAddr, client);
  }

  /* new parameters: always big endian */
  reply.capabilities = ntohl(config->capabilities);
  udpc_copyToMessage(reply.mcastAddr, &config->dataMcastAddr);

  /*reply.mcastAddress = mcastAddress;*/
  // rgWaitAll(config, sock, client->sin_addr.s_addr, sizeof(reply));
  if (SEND(sock, reply, *client) < 0)
  {
    perror("reply add new client");
    return -1;
  }

  return 0;
}

int mainDispatcher(int* fd, int nr, participantsDb_t db, struct net_config* net_config, int* tries,
                   time_t* firstConnected)
{
  struct sockaddr_in client;
  union message fromClient;
  fd_set read_set;
  int ret;
  int msgLength;
  int startNow = 0;
  int selected;

  if (firstConnected && !*firstConnected && udpc_nrParticipants(db))
  {
    *firstConnected = time(0);
#ifdef USE_SYSLOG
    syslog(LOG_INFO, "first connection: min wait[%d] secs - max wait[%d] - min clients[%d]",
           net_config->min_receivers_wait, net_config->max_receivers_wait, net_config->min_receivers);
#endif
  }

  while (!startNow)
  {
    struct timeval tv;
    struct timeval* tvp;
    int nr_desc;

    int maxFd = prepareForSelect(fd, nr, &read_set);

    if (net_config->rexmit_hello_interval)
    {
      tv.tv_usec = (net_config->rexmit_hello_interval % 1000) * 1000;
      tv.tv_sec = net_config->rexmit_hello_interval / 1000;
      tvp = &tv;
    }
    else if (firstConnected && udpc_nrParticipants(db))
    {
      tv.tv_usec = 0;
      tv.tv_sec = 2;
      tvp = &tv;
    }
    else
      tvp = 0;

    nr_desc = selectWithoutConsole(maxFd + 1, &read_set, tvp);

    if (nr_desc < 0)
    {
      perror("select");
      return -1;
    }

    if (nr_desc > 0)
      /* key pressed, or receiver activity */
      break;

    if (net_config->rexmit_hello_interval)
    {
      /* retransmit hello message */
      udpc_sendHello(net_config, fd[0], 0);
      (*tries)++;

      if (net_config->autostart != 0 && *tries > net_config->autostart)
        startNow = 1;
    }

    if (firstConnected)
      startNow = startNow || checkClientWait(db, net_config, firstConnected);
  }

  selected = udpc_getSelectedSock(fd, nr, &read_set);

  if (selected == -1)
    return startNow;

  BZERO(fromClient); /* Zero it out in order to cope with short messages
                      * from older versions */

  msgLength = RECV(selected, fromClient, client, net_config->portBase);

  if (msgLength < 0)
  {
    perror("problem getting data from client");
    return 0; /* don't panic if we get weird messages */
  }

  if (net_config->flags & FLAG_ASYNC)
    return 0;

  switch (ntohs(fromClient.opCode))
  {
    case CMD_CONNECT_REQ:
      sendConnectionReply(db, fd[0], net_config, &client,
                          CAP_BIG_ENDIAN | ntohl(fromClient.connectReq.capabilities),
                          ntohl(fromClient.connectReq.rcvbuf));
      return startNow;

    case CMD_GO: return 1;

    case CMD_DISCONNECT:
      ret = udpc_lookupParticipant(db, &client);

      if (ret >= 0)
        udpc_removeParticipant(db, ret);

      return startNow;

    default: break;
  }

  return startNow;
}

static int isPointToPoint(participantsDb_t db, int flags)
{
  if (flags & FLAG_POINTOPOINT)
    return 1;

  if (flags & (FLAG_NOPOINTOPOINT | FLAG_ASYNC))
    return 0;

  return udpc_nrParticipants(db) == 1;
}

int getProducedAmount(produconsum_t pc)
{
  unsigned int produced = pc->produced;
  unsigned int consumed = pc->consumed;

  if (produced < consumed)
    return produced + 2 * pc->size - consumed;
  else
    return produced - consumed;
}

int _consumeAny(produconsum_t pc, unsigned int minAmount, struct timespec* ts)
{
  unsigned int amount;
#if DEBUG
  flprintf("%s: Waiting for %d bytes (%d:%d)\n", pc->name, minAmount, pc->consumed, pc->produced);
#endif
  pc->consumerIsWaiting = 1;
  amount = getProducedAmount(pc);

  if (amount >= minAmount || pc->atEnd)
  {
    pc->consumerIsWaiting = 0;
#if DEBUG
    flprintf("%s: got %d bytes\n", pc->name, amount);
#endif
    return amount;
  }

  pthread_mutex_lock(&pc->mutex);

  while ((amount = getProducedAmount(pc)) < minAmount && !pc->atEnd)
  {
#if DEBUG
    flprintf("%s: ..Waiting for %d bytes (%d:%d)\n", pc->name, minAmount, pc->consumed, pc->produced);
#endif

    if (ts == 0)
      pthread_cond_wait(&pc->cond, &pc->mutex);
    else
    {
      int r;
#if DEBUG
      flprintf("Before timed wait\n");
#endif
      r = pthread_cond_timedwait(&pc->cond, &pc->mutex, ts);
#if DEBUG
      flprintf("After timed wait %d\n", r);
#endif

      if (r == ETIMEDOUT)
      {
        amount = getProducedAmount(pc);
        break;
      }
    }
  }

  pthread_mutex_unlock(&pc->mutex);
#if DEBUG
  flprintf("%s: Got them %d (for %d) %d\n", pc->name, amount, minAmount, pc->atEnd);
#endif
  pc->consumerIsWaiting = 0;
  return amount;
}

produconsum_t pc_makeProduconsum(int size, const char* name)
{
  produconsum_t pc = MALLOC(struct produconsum);
  pc->size = size;
  pc->produced = 0;
  pc->consumed = 0;
  pc->atEnd = 0;
  pthread_mutex_init(&pc->mutex, NULL);
  pc->consumerIsWaiting = 0;
  pthread_cond_init(&pc->cond, NULL);
  pc->name = name;
  return pc;
}

int pc_consumeAnyWithTimeout(produconsum_t pc, struct timespec* ts)
{
  return _consumeAny(pc, 1, ts);
}

unsigned int pc_getProducerPosition(produconsum_t pc)
{
  return pc->produced % pc->size;
}

unsigned int pc_getWaiting(produconsum_t pc)
{
  return getProducedAmount(pc);
}

int pc_consumeAny(produconsum_t pc)
{
  return _consumeAny(pc, 1, 0);
}

int pc_consume(produconsum_t pc, int amount)
{
  return _consumeAny(pc, amount, 0);
}

void wakeConsumer(produconsum_t pc)
{
  if (pc->consumerIsWaiting)
  {
    pthread_mutex_lock(&pc->mutex);
    pthread_cond_signal(&pc->cond);
    pthread_mutex_unlock(&pc->mutex);
  }
}

void pc_produceEnd(produconsum_t pc)
{
  pc->atEnd = 1;
  wakeConsumer(pc);
}

int pc_consumed(produconsum_t pc, int amount)
{
  unsigned int consumed = pc->consumed;

  if (consumed >= 2 * pc->size - amount)
  {
    consumed += amount - 2 * pc->size;
  }
  else
  {
    consumed += amount;
  }

  pc->consumed = consumed;
  return amount;
}

unsigned int pc_getConsumerPosition(produconsum_t pc)
{
  return pc->consumed % pc->size;
}

void pc_produce(produconsum_t pc, unsigned int amount)
{
  unsigned int produced = pc->produced;
  unsigned int consumed = pc->consumed;

  /* sanity checks:
   * 1. should not produce more than size
   * 2. do not pass consumed+size
   */
  if (amount > pc->size)
  {
    // TODO: FIXME
    // udpc_fatal(1, "Buffer overflow in produce %s: %d > %d \n", pc->name, amount, pc->size);
  }

  produced += amount;

  if (produced >= 2 * pc->size)
    produced -= 2 * pc->size;

  if (produced > consumed + pc->size || (produced < consumed && produced > consumed - pc->size))
  {
    // TODO: FIXME
    // udpc_fatal(1, "Buffer overflow in produce %s: %d > %d [%d] \n", pc->name, produced, consumed,
    // pc->size);
  }

  pc->produced = produced;
  wakeConsumer(pc);
}

void udpc_initFifo(struct fifo* fifo, int blockSize)
{
  fifo->dataBufSize = blockSize * 4096;
  fifo->dataBuffer = (unsigned char*)malloc(fifo->dataBufSize + 4096);
  fifo->dataBuffer += 4096 - (((unsigned long)fifo->dataBuffer) % 4096);

  /* Free memory queue is initially full */
  fifo->freeMemQueue = pc_makeProduconsum(fifo->dataBufSize, "free mem");
  pc_produce(fifo->freeMemQueue, fifo->dataBufSize);

  fifo->data = pc_makeProduconsum(fifo->dataBufSize, "receive");
}

THREAD_RETURN returnChannelMain(void* args)
{
  struct returnChannel* returnChannel = (struct returnChannel*)args;

  while (1)
  {
    struct sockaddr_in from;
    int clNo;
    int pos = pc_getConsumerPosition(returnChannel->freeSpace);
    pc_consumeAny(returnChannel->freeSpace);

    RECV(returnChannel->rcvSock, returnChannel->q[pos].msg, from, returnChannel->config->portBase);
    clNo = udpc_lookupParticipant(returnChannel->participantsDb, &from);

    if (clNo < 0)
    {
      /* packet from unknown provenance */
      continue;
    }

    returnChannel->q[pos].clNo = clNo;
    pc_consumed(returnChannel->freeSpace, 1);
    pc_produce(returnChannel->incoming, 1);
  }

  return 0;
}

void initReturnChannel(struct returnChannel* returnChannel, struct net_config* config, int sock)
{
  returnChannel->config = config;
  returnChannel->rcvSock = sock;
  returnChannel->freeSpace = pc_makeProduconsum(QUEUE_SIZE, "msg:free-queue");
  pc_produce(returnChannel->freeSpace, QUEUE_SIZE);
  returnChannel->incoming = pc_makeProduconsum(QUEUE_SIZE, "msg:incoming");

  pthread_create(&returnChannel->thread, NULL, returnChannelMain, returnChannel);
}

void senderStatsAddBytes(sender_stats_t ss, long bytes)
{
  if (ss != NULL)
  {
    ss->totalBytes += bytes;

    if (ss->bwPeriod)
    {
      double tdiff, bw;
      struct timeval tv;
      gettimeofday(&tv, 0);
      ss->periodBytes += bytes;

      if (tv.tv_sec - ss->periodStart.tv_sec < ss->bwPeriod - 1)
        return;

      tdiff = (tv.tv_sec - ss->periodStart.tv_sec) * 1000000.0 + tv.tv_usec - ss->periodStart.tv_usec;

      if (tdiff < ss->bwPeriod * 1000000.0)
        return;

      bw = ss->periodBytes * 8.0 / tdiff;
      ss->periodBytes = 0;
      ss->periodStart = tv;
    }
  }
}
int ackSlice(struct slice* slice, struct net_config* net_config, struct fifo* fifo, sender_stats_t stats)
{
  if (slice->state == slice::SLICE_ACKED)
    /* already acked */
    return 0;

  if (!(net_config->flags & FLAG_SN))
  {
    if (net_config->discovery == net_config::DSC_DOUBLING)
    {
      net_config->sliceSize += net_config->sliceSize / 4;

      if (net_config->sliceSize >= net_config->max_slice_size)
      {
        net_config->sliceSize = net_config->max_slice_size;
        net_config->discovery = net_config::DSC_REDUCING;
      }
    }
  }

  slice->state = slice::SLICE_ACKED;
  pc_produce(fifo->freeMemQueue, slice->bytes);

  /* Statistics */
  senderStatsAddBytes(stats, slice->bytes);

  /* End Statistics */

  return 0;
}

int isSliceAcked(struct slice* slice)
{
  if (slice->state == slice::SLICE_ACKED)
  {
    return 1;
  }
  else
  {
    return 0;
  }
}

int freeSlice(sender_state_t sendst, struct slice* slice)
{
  int i;
  i = slice - sendst->slices;
  slice->state = slice::SLICE_PRE_FREE;

  while (1)
  {
    int pos = pc_getProducerPosition(sendst->free_slices_pc);

    if (sendst->slices[pos].state == slice::SLICE_PRE_FREE)
      sendst->slices[pos].state = slice::SLICE_FREE;
    else
      break;

    pc_produce(sendst->free_slices_pc, 1);
  }

  return 0;
}

int isSliceXmitted(struct slice* slice)
{
  if (slice->state == slice::SLICE_XMITTED)
  {
    return 1;
  }
  else
  {
    return 0;
  }
}

int getSliceBlocks(struct slice* slice, struct net_config* net_config)
{
  return (slice->bytes + net_config->blockSize - 1) / net_config->blockSize;
}

int sendReqack(struct slice* slice, struct net_config* net_config, struct fifo* fifo, sender_stats_t stats,
               int sock)
{
  /* in async mode, just confirm slice... */
  if ((net_config->flags & FLAG_ASYNC) && slice->bytes != 0)
  {
    ackSlice(slice, net_config, fifo, stats);
    return 0;
  }

  if ((net_config->flags & FLAG_ASYNC)
#ifdef BB_FEATURE_UDPCAST_FEC
      && (net_config->flags & FLAG_FEC)
#endif
  )
  {
    return 0;
  }

  if (!(net_config->flags & FLAG_SN) && slice->rxmitId != 0)
  {
    int nrBlocks;
    nrBlocks = getSliceBlocks(slice, net_config);

    if (slice->lastGoodBlock != 0 && slice->lastGoodBlock < nrBlocks)
    {
      net_config->discovery = net_config::DSC_REDUCING;

      if (slice->lastGoodBlock < net_config->sliceSize / 2)
      {
        net_config->sliceSize = net_config->sliceSize / 2;
      }
      else
      {
        net_config->sliceSize = slice->lastGoodBlock;
      }

      if (net_config->sliceSize < 32)
      {
        /* a minimum of 32 */
        net_config->sliceSize = 32;
      }
    }
  }

  slice->lastGoodBlock = 0;
  slice->sl_reqack.ra.opCode = htons(CMD_REQACK);
  slice->sl_reqack.ra.sliceNo = htonl(slice->sliceNo);
  slice->sl_reqack.ra.bytes = htonl(slice->bytes);

  slice->sl_reqack.ra.reserved = 0;
  memcpy((void*)&slice->answeredSet, (void*)&slice->sl_reqack.readySet, sizeof(slice->answeredSet));
  slice->nrAnswered = slice->nrReady;

  /* not everybody is ready yet */
  slice->needRxmit = 0;
  memset(slice->rxmitMap, 0, sizeof(slice->rxmitMap));
  memset(slice->isXmittedMap, 0, sizeof(slice->isXmittedMap));
  slice->sl_reqack.ra.rxmit = htonl(slice->rxmitId);

  // rgWaitAll(net_config, sock, net_config->dataMcastAddr.sin_addr.s_addr, sizeof(slice->sl_reqack));
  BCAST_DATA(sock, slice->sl_reqack);
  return 0;
}

struct slice* findSlice(struct slice* slice1, struct slice* slice2, int sliceNo)
{
  if (slice1 != NULL && slice1->sliceNo == sliceNo)
    return slice1;

  if (slice2 != NULL && slice2->sliceNo == sliceNo)
    return slice2;

  return NULL;
}

void markParticipantAnswered(slice_t slice, int clNo)
{
  if (BIT_ISSET(clNo, slice->answeredSet))
    /* client already has answered */
    return;

  slice->nrAnswered++;
  SET_BIT(clNo, slice->answeredSet);
}

int udpc_isParticipantValid(struct participantsDb* db, int i)
{
  return db->clientTable[i].used;
}

void senderSetAnswered(sender_stats_t ss, int clNo)
{
  if (ss != NULL)
    ss->clNo = clNo;
}

int handleOk(sender_state_t sendst, struct slice* slice, int clNo)
{
  if (slice == NULL)
    return 0;

  if (!udpc_isParticipantValid(sendst->rc.participantsDb, clNo))
  {
    // udpc_flprintf("Invalid participant %d\n", clNo);
    return 0;
  }

  if (BIT_ISSET(clNo, slice->sl_reqack.readySet))
  {
    /* client is already marked ready */
  }
  else
  {
    SET_BIT(clNo, slice->sl_reqack.readySet);
    slice->nrReady++;
    senderSetAnswered(sendst->stats, clNo);
    markParticipantAnswered(slice, clNo);
  }

  return 0;
}

int handleDisconnect1(struct slice* slice, int clNo)
{
  if (slice != NULL)
  {
    if (BIT_ISSET(clNo, slice->sl_reqack.readySet))
    {
      /* avoid counting client both as left and ready */
      CLR_BIT(clNo, slice->sl_reqack.readySet);
      slice->nrReady--;
    }

    if (BIT_ISSET(clNo, slice->answeredSet))
    {
      slice->nrAnswered--;
      CLR_BIT(clNo, slice->answeredSet);
    }
  }

  return 0;
}

int handleDisconnect(participantsDb_t db, struct slice* slice1, struct slice* slice2, int clNo)
{
  handleDisconnect1(slice1, clNo);
  handleDisconnect1(slice2, clNo);
  udpc_removeParticipant(db, clNo);
  return 0;
}

int handleRetransmit(sender_state_t sendst, struct slice* slice, int clNo, unsigned char* map, int rxmit)
{
  unsigned int i;

  if (!udpc_isParticipantValid(sendst->rc.participantsDb, clNo))
  {
    // udpc_flprintf("Invalid participant %d\n", clNo);
    return 0;
  }

  if (slice == NULL)
    return 0;

  if (rxmit < slice->rxmitId)
  {
    /* late answer to previous Req Ack */
    return 0;
  }

  for (i = 0; i < sizeof(slice->rxmitMap) / sizeof(char); i++)
  {
    slice->rxmitMap[i] |= ~map[i];
  }

  slice->needRxmit = 1;
  markParticipantAnswered(slice, clNo);
  return 0;
}
int handleNextMessage(sender_state_t sendst, struct slice* xmitSlice, struct slice* rexmitSlice)
{
  int pos = pc_getConsumerPosition(sendst->rc.incoming);
  union message* msg = &sendst->rc.q[pos].msg;
  int clNo = sendst->rc.q[pos].clNo;

  pc_consumeAny(sendst->rc.incoming);

  switch (ntohs(msg->opCode))
  {
    case CMD_OK: handleOk(sendst, findSlice(xmitSlice, rexmitSlice, ntohl(msg->ok.sliceNo)), clNo); break;

    case CMD_DISCONNECT: handleDisconnect(sendst->rc.participantsDb, xmitSlice, rexmitSlice, clNo); break;

    case CMD_RETRANSMIT:
      handleRetransmit(sendst, findSlice(xmitSlice, rexmitSlice, ntohl(msg->retransmit.sliceNo)), clNo,
                       msg->retransmit.map, msg->retransmit.rxmit);
      break;

    default:
      // TODO: FIXME
      // udpc_flprintf("Bad command %04x\n", (unsigned short) msg->opCode);
      break;
  }

  pc_consumed(sendst->rc.incoming, 1);
  pc_produce(sendst->rc.freeSpace, 1);
  return 0;
}

int sendRawData(int sock, struct net_config* config, char* header, int headerSize, unsigned char* data,
                int dataSize)
{
  struct iovec iov[2];
  struct msghdr hdr;
  int packetSize;
  int ret;

  iov[0].iov_base = header;
  iov[0].iov_len = headerSize;

  iov[1].iov_base = data;
  iov[1].iov_len = dataSize;

  hdr.msg_name = &config->dataMcastAddr;
  hdr.msg_namelen = sizeof(struct sockaddr_in);
  hdr.msg_iov = iov;
  hdr.msg_iovlen = 2;
  initMsgHdr(&hdr);

  packetSize = dataSize + headerSize;
  // rgWaitAll(config, sock, config->dataMcastAddr.sin_addr.s_addr, packetSize);
  ret = sendmsg(sock, &hdr, 0);

  if (ret < 0)
  {
    // TODO: FIXME
  }

  return 0;
}
int transmitDataBlock(sender_state_t sendst, struct slice* slice, int i)
{
  struct fifo* fifo = sendst->fifo;
  struct net_config* config = sendst->config;
  struct dataBlock msg;

  idbassert(i < MAX_SLICE_SIZE);

  msg.opCode = htons(CMD_DATA);
  msg.sliceNo = htonl(slice->sliceNo);
  msg.blockNo = htons(i);

  msg.reserved = 0;
  msg.reserved2 = 0;
  msg.bytes = htonl(slice->bytes);

  sendRawData(sendst->socket, config, (char*)&msg, sizeof(msg),
              fifo->dataBuffer + (slice->base + i * config->blockSize) % fifo->dataBufSize,
              config->blockSize);
  return 0;
}

void senderStatsAddRetransmissions(sender_stats_t ss, int retransmissions)
{
  if (ss != NULL)
  {
    ss->retransmissions += retransmissions;
  }
}

int sendSlice(sender_state_t sendst, struct slice* slice, int retransmitting)
{
  struct net_config* config = sendst->config;

  int nrBlocks, i, rehello;
#ifdef BB_FEATURE_UDPCAST_FEC
  int fecBlocks;
#endif
  int retransmissions = 0;

  if (retransmitting)
  {
    slice->nextBlock = 0;

    if (slice->state != slice::SLICE_XMITTED)
      return 0;
  }
  else
  {
    if (slice->state != slice::SLICE_NEW)
      return 0;
  }

  nrBlocks = getSliceBlocks(slice, config);
#ifdef BB_FEATURE_UDPCAST_FEC

  if ((config->flags & FLAG_FEC) && !retransmitting)
  {
    fecBlocks = config->fec_redundancy * config->fec_stripes;
  }
  else
  {
    fecBlocks = 0;
  }

#endif

  if ((sendst->config->flags & FLAG_STREAMING))
  {
    rehello = nrBlocks - sendst->config->rehelloOffset;

    if (rehello < 0)
      rehello = 0;
  }
  else
  {
    rehello = -1;
  }

  /* transmit the data */
  for (i = slice->nextBlock; i < nrBlocks
#ifdef BB_FEATURE_UDPCAST_FEC
                                     + fecBlocks
#endif
       ;
       i++)
  {
    if (retransmitting)
    {
      if (!BIT_ISSET(i, slice->rxmitMap) || BIT_ISSET(i, slice->isXmittedMap))
      {
        /* if slice is not in retransmit list, or has _already_
         * been retransmitted, skip it */
        if (i > slice->lastGoodBlock)
          slice->lastGoodBlock = i;

        continue;
      }

      SET_BIT(i, slice->isXmittedMap);
      retransmissions++;
    }

    if (i == rehello)
    {
      udpc_sendHello(sendst->config, sendst->socket, 1);
    }

    if (i < nrBlocks)
      transmitDataBlock(sendst, slice, i);

#ifdef BB_FEATURE_UDPCAST_FEC
    else
      transmitFecBlock(sendst, slice, i - nrBlocks);

#endif

    if (!retransmitting && pc_getWaiting(sendst->rc.incoming))
    {
      i++;
      break;
    }
  }

  if (retransmissions)
    senderStatsAddRetransmissions(sendst->stats, retransmissions);

  slice->nextBlock = i;

  if (i == nrBlocks
#ifdef BB_FEATURE_UDPCAST_FEC
               + fecBlocks
#endif
  )
  {
    slice->needRxmit = 0;

    if (!retransmitting)
      slice->state = slice::SLICE_XMITTED;

    return 2;
  }

  return 1;
}

int doRetransmissions(sender_state_t sendst, struct slice* slice)
{
  if (slice->state == slice::SLICE_ACKED)
    return 0; /* nothing to do */

  /* FIXME: reduce slice size if needed */
  if (slice->needRxmit)
  {
    /* do some retransmissions */
    sendSlice(sendst, slice, 1);
  }

  return 0;
}

struct slice* makeSlice(sender_state_t sendst, int sliceNo)
{
  struct net_config* config = sendst->config;
  struct fifo* fifo = sendst->fifo;
  int i;
  struct slice* slice = NULL;

  pc_consume(sendst->free_slices_pc, 1);
  i = pc_getConsumerPosition(sendst->free_slices_pc);
  slice = &sendst->slices[i];
  idbassert(slice->state == slice::SLICE_FREE);
  BZERO(*slice);
  pc_consumed(sendst->free_slices_pc, 1);

  slice->base = pc_getConsumerPosition(sendst->fifo->data);
  slice->sliceNo = sliceNo;
  slice->bytes = pc_consume(fifo->data, 10 * config->blockSize);

  /* fixme: use current slice size here */
  if (slice->bytes > config->blockSize * config->sliceSize)
    slice->bytes = config->blockSize * config->sliceSize;

  pc_consumed(fifo->data, slice->bytes);
  slice->nextBlock = 0;
  slice->state = slice::SLICE_NEW;
  BZERO(slice->sl_reqack.readySet);
  slice->nrReady = 0;
#ifdef BB_FEATURE_UDPCAST_FEC
  slice->fec_data = sendst->fec_data + (i * config->fec_stripes * config->fec_redundancy * config->blockSize);
#endif
  return slice;
}

void cancelReturnChannel(struct returnChannel* returnChannel)
{
  /* No need to worry about the pthread_cond_wait in produconsum, because
   * at the point where we enter here (to cancel the thread), we are sure
   * that nobody else uses that produconsum any more
   */
  pthread_cancel(returnChannel->thread);
  pthread_join(returnChannel->thread, NULL);
}

THREAD_RETURN netSenderMain(void* args0)
{
  sender_state_t sendst = (sender_state_t)args0;
  struct net_config* config = sendst->config;
  struct timeval tv;
  struct timespec ts;
  int atEnd = 0;
  int nrWaited = 0;
  unsigned long waitAverage = 10000; /* Exponential average of last wait times */

  struct slice* xmitSlice = NULL;   /* slice being transmitted a first time */
  struct slice* rexmitSlice = NULL; /* slice being re-transmitted */
  int sliceNo = 0;

  /* transmit the data */
  if (config->default_slice_size == 0)
  {
#ifdef BB_FEATURE_UDPCAST_FEC

    if (config->flags & FLAG_FEC)
    {
      config->sliceSize = config->fec_stripesize * config->fec_stripes;
    }
    else
#endif
        if (config->flags & FLAG_ASYNC)
      config->sliceSize = 1024;
    else if (sendst->config->flags & FLAG_SN)
    {
      sendst->config->sliceSize = 112;
    }
    else
      sendst->config->sliceSize = 130;

    sendst->config->discovery = net_config::DSC_DOUBLING;
  }
  else
  {
    config->sliceSize = config->default_slice_size;
#ifdef BB_FEATURE_UDPCAST_FEC

    if ((config->flags & FLAG_FEC) && (config->sliceSize > 128 * config->fec_stripes))
      config->sliceSize = 128 * config->fec_stripes;

#endif
  }

#ifdef BB_FEATURE_UDPCAST_FEC

  if ((sendst->config->flags & FLAG_FEC) && config->max_slice_size > config->fec_stripes * 128)
    config->max_slice_size = config->fec_stripes * 128;

#endif

  if (config->sliceSize > config->max_slice_size)
    config->sliceSize = config->max_slice_size;

  idbassert(config->sliceSize <= MAX_SLICE_SIZE);

  do
  {
    /* first, cleanup rexmit Slice if needed */

    if (rexmitSlice != NULL)
    {
      if (rexmitSlice->nrReady == udpc_nrParticipants(sendst->rc.participantsDb))
      {
#if DEBUG
        flprintf("slice is ready\n");
#endif
        ackSlice(rexmitSlice, sendst->config, sendst->fifo, sendst->stats);
      }

      if (isSliceAcked(rexmitSlice))
      {
        freeSlice(sendst, rexmitSlice);
        rexmitSlice = NULL;
      }
    }

    /* then shift xmit slice to rexmit slot, if possible */
    if (rexmitSlice == NULL && xmitSlice != NULL && isSliceXmitted(xmitSlice))
    {
      rexmitSlice = xmitSlice;
      xmitSlice = NULL;
      sendReqack(rexmitSlice, sendst->config, sendst->fifo, sendst->stats, sendst->socket);
    }

    /* handle any messages */
    if (pc_getWaiting(sendst->rc.incoming))
    {
#if DEBUG
      flprintf("Before message %d\n", pc_getWaiting(sendst->rc.incoming));
#endif
      handleNextMessage(sendst, xmitSlice, rexmitSlice);

      /* restart at beginning of loop: we may have acked the rxmit
       * slice, makeing it possible to shift the pipe */
      continue;
    }

    /* do any needed retransmissions */
    if (rexmitSlice != NULL && rexmitSlice->needRxmit)
    {
      doRetransmissions(sendst, rexmitSlice);
      /* restart at beginning: new messages may have arrived during
       * retransmission  */
      continue;
    }

    /* if all participants answered, send req ack */
    if (rexmitSlice != NULL && rexmitSlice->nrAnswered == udpc_nrParticipants(sendst->rc.participantsDb))
    {
      rexmitSlice->rxmitId++;
      sendReqack(rexmitSlice, sendst->config, sendst->fifo, sendst->stats, sendst->socket);
    }

    if (xmitSlice == NULL && !atEnd)
    {
#if DEBUG
      flprintf("SN=%d\n", sendst->config->flags & FLAG_SN);
#endif

      if ((sendst->config->flags & FLAG_SN) || rexmitSlice == NULL)
      {
#ifdef BB_FEATURE_UDPCAST_FEC

        if (sendst->config->flags & FLAG_FEC)
        {
          int i;
          pc_consume(sendst->fec_data_pc, 1);
          i = pc_getConsumerPosition(sendst->fec_data_pc);
          xmitSlice = &sendst->slices[i];
          pc_consumed(sendst->fec_data_pc, 1);
        }
        else
#endif
        {
          xmitSlice = makeSlice(sendst, sliceNo++);
        }

        if (xmitSlice->bytes == 0)
          atEnd = 1;
      }
    }

    if (xmitSlice != NULL && xmitSlice->state == slice::SLICE_NEW)
    {
      sendSlice(sendst, xmitSlice, 0);
#if DEBUG
      flprintf("%d Interrupted at %d/%d\n", xmitSlice->sliceNo, xmitSlice->nextBlock,
               getSliceBlocks(xmitSlice, sendst->config));
#endif
      continue;
    }

    if (atEnd && rexmitSlice == NULL && xmitSlice == NULL)
      break;

    if (sendst->config->flags & FLAG_ASYNC)
      break;

#if DEBUG
    flprintf("Waiting for timeout...\n");
#endif
    gettimeofday(&tv, 0);
    ts.tv_sec = tv.tv_sec;
    ts.tv_nsec = (long int)((tv.tv_usec + 1.1 * waitAverage) * 1000);

#ifdef WINDOWS
    /* Windows has a granularity of 1 millisecond in its timer. Take this
     * into account here */
#define GRANULARITY 1000000
    ts.tv_nsec += 3 * GRANULARITY / 2;
    ts.tv_nsec -= ts.tv_nsec % GRANULARITY;
#endif

#define BILLION 1000000000

    while (ts.tv_nsec >= BILLION)
    {
      ts.tv_nsec -= BILLION;
      ts.tv_sec++;
    }

    if (rexmitSlice->rxmitId > 10)
      /* after tenth retransmission, wait minimum one second */
      ts.tv_sec++;

    if (pc_consumeAnyWithTimeout(sendst->rc.incoming, &ts) != 0)
    {
#if DEBUG
      flprintf("Have data\n");
#endif
      {
        struct timeval tv2;
        unsigned long timeout;
        gettimeofday(&tv2, 0);
        timeout = (tv2.tv_sec - tv.tv_sec) * 1000000 + tv2.tv_usec - tv.tv_usec;

        if (nrWaited)
          timeout += waitAverage;

        waitAverage += 9; /* compensate against rounding errors */
        waitAverage = (long unsigned int)((0.9 * waitAverage + 0.1 * timeout));
      }
      nrWaited = 0;
      continue;
    }

    if (rexmitSlice == NULL)
    {
      // TODO: FIXME
      // udpc_flprintf("Weird. Timeout and no rxmit slice");
      break;
    }

    if (nrWaited > 5)
    {
#ifndef WINDOWS
      /* on Cygwin, we would get too many of those messages... */
      nrWaited = 0;
#endif
    }

    nrWaited++;

    if (rexmitSlice->rxmitId > config->retriesUntilDrop)
    {
      int i;

      for (i = 0; i < MAX_CLIENTS; i++)
      {
        if (udpc_isParticipantValid(sendst->rc.participantsDb, i) &&
            !BIT_ISSET(i, rexmitSlice->sl_reqack.readySet))
        {
          udpc_removeParticipant(sendst->rc.participantsDb, i);

          if (udpc_nrParticipants(sendst->rc.participantsDb) == 0)
            goto exit_main_loop;
        }
      }

      continue;
    }

    rexmitSlice->rxmitId++;
    sendReqack(rexmitSlice, sendst->config, sendst->fifo, sendst->stats, sendst->socket);
  } while (udpc_nrParticipants(sendst->rc.participantsDb) || (config->flags & FLAG_ASYNC));

exit_main_loop:
  cancelReturnChannel(&sendst->rc);
  pc_produceEnd(sendst->fifo->freeMemQueue);
  return 0;
}

int spawnNetSender(struct fifo* fifo, int sock, struct net_config* config, participantsDb_t db)
{
  int i;

  sender_state_t sendst = MALLOC(struct senderState);
  sendst->fifo = fifo;
  sendst->socket = sock;
  sendst->config = config;
  // sendst->stats = stats;
#ifdef BB_FEATURE_UDPCAST_FEC

  if (sendst->config->flags & FLAG_FEC)
    sendst->fec_data = xmalloc(NR_SLICES * config->fec_stripes * config->fec_redundancy * config->blockSize);

#endif
  sendst->rc.participantsDb = db;
  initReturnChannel(&sendst->rc, sendst->config, sendst->socket);

  sendst->free_slices_pc = pc_makeProduconsum(NR_SLICES, "free slices");
  pc_produce(sendst->free_slices_pc, NR_SLICES);

  for (i = 0; i < NR_SLICES; i++)
    sendst->slices[i].state = slice::SLICE_FREE;

#ifdef BB_FEATURE_UDPCAST_FEC

  if (sendst->config->flags & FLAG_FEC)
  {
    /* Free memory queue is initially full */
    fec_init();
    sendst->fec_data_pc = pc_makeProduconsum(NR_SLICES, "fec data");

    pthread_create(&sendst->fec_thread, NULL, fecMain, sendst);
  }

#endif

  pthread_create(&fifo->thread, NULL, netSenderMain, sendst);
  return 0;
}

struct sockaddr_in* udpc_getParticipantIp(participantsDb_t db, int i)
{
  return &db->clientTable[i].addr;
}

int udpc_getParticipantCapabilities(participantsDb_t db, int i)
{
  return db->clientTable[i].capabilities;
}

unsigned int udpc_getParticipantRcvBuf(participantsDb_t db, int i)
{
  return db->clientTable[i].rcvbuf;
}

int pc_consumeContiguousMinAmount(produconsum_t pc, int amount)
{
  int n = _consumeAny(pc, amount, 0);
  int l = pc->size - (pc->consumed % pc->size);

  if (n > l)
    n = l;

  return n;
}

#define BLOCKSIZE 4096

void localReader(struct fifo* fifo, const uint8_t* buf, uint32_t len)
{
  // cerr << "starting to send " << len << " bytes" << endl;
  uint32_t offset = 0;

  while (1)
  {
    int pos = pc_getConsumerPosition(fifo->freeMemQueue);
    int bytes = pc_consumeContiguousMinAmount(fifo->freeMemQueue, BLOCKSIZE);

    if (bytes > (pos + bytes) % BLOCKSIZE)
      bytes -= (pos + bytes) % BLOCKSIZE;

    if (offset + bytes > len)
      bytes = len - offset;

    // cerr << "sending " << bytes << " bytes from bs..." << endl;
    // bytes = read(in, fifo->dataBuffer + pos, bytes);
    memcpy(fifo->dataBuffer + pos, buf + offset, bytes);
    offset += bytes;

    if (bytes == 0)
    {
      /* the end */
      pc_produceEnd(fifo->data);
      break;
    }
    else
    {
      pc_consumed(fifo->freeMemQueue, bytes);
      pc_produce(fifo->data, bytes);
    }
  }

  // cerr << "done sending" << endl;
}

unsigned int udpc_getRcvBuf(int sock)
{
  unsigned int bufsize;
  socklen_t len = sizeof(int);

  if (getsockopt(sock, SOL_SOCKET, SO_RCVBUF, (char*)&bufsize, &len) < 0)
    return -1;

  return bufsize;
}

int sendConnectReq(struct client_config* client_config, struct net_config* net_config, int haveServerAddress)
{
  // cerr << "sending a connect request" << endl;
  struct connectReq connectReq;

  if (net_config->flags & FLAG_PASSIVE)
    return 0;

  connectReq.opCode = htons(CMD_CONNECT_REQ);
  connectReq.reserved = 0;
  connectReq.capabilities = htonl(RECEIVER_CAPABILITIES);
  connectReq.rcvbuf = htonl(udpc_getRcvBuf(client_config->S_UCAST));

  if (haveServerAddress)
    return SSEND(connectReq);
  else
    return BCAST_CONTROL(client_config->S_UCAST, connectReq);
}

void udpc_setRcvBuf(int sock, unsigned int bufsize)
{
  if (setsockopt(sock, SOL_SOCKET, SO_RCVBUF, (char*)&bufsize, sizeof(bufsize)) < 0)
  {
    // TODO: FIXME
    // perror("Set receiver buffer");
  }
}

void udpc_copyFromMessage(struct sockaddr_in* dst, unsigned char* src)
{
  memcpy((char*)&dst->sin_addr, src, sizeof(struct in_addr));
}

unsigned short udpc_getPort(struct sockaddr_in* addr)
{
  return ntohs(((struct sockaddr_in*)addr)->sin_port);
}

int udpc_selectSock(int* socks, int nr, int startTimeout)
{
  fd_set read_set;
  int r;
  int maxFd;
  struct timeval tv, *tvp;

  if (startTimeout)
  {
    tv.tv_sec = startTimeout;
    tv.tv_usec = 0;
    tvp = &tv;
  }
  else
  {
    tvp = NULL;
  }

  maxFd = prepareForSelect(socks, nr, &read_set);
  r = select(maxFd + 1, &read_set, NULL, NULL, tvp);

  if (r < 0)
    return r;

  return udpc_getSelectedSock(socks, nr, &read_set);
}

void udpc_zeroSockArray(int* socks, int nr)
{
  int i;

  for (i = 0; i < nr; i++)
    socks[i] = -1;
}

unsigned char* getBlockSpace(struct clientState* clst)
{
  int pos;

  if (clst->localPos)
  {
    clst->localPos--;
    return clst->localBlockAddresses[clst->localPos];
  }

  pc_consume(clst->freeBlocks_pc, 1);
  pos = pc_getConsumerPosition(clst->freeBlocks_pc);
  pc_consumed(clst->freeBlocks_pc, 1);
  return clst->blockAddresses[pos];
}

void setNextBlock(struct clientState* clst)
{
  clst->nextBlock = getBlockSpace(clst);
}

int setupMessages(struct clientState* clst)
{
  /* the messages received from the server */
  clst->data_iov[0].iov_base = (void*)&clst->Msg;
  clst->data_iov[0].iov_len = sizeof(clst->Msg);

  /* namelen set just before reception */
  clst->data_hdr.msg_iov = clst->data_iov;
  /* iovlen set just before reception */

  initMsgHdr(&clst->data_hdr);
  return 0;
}

struct slice* initSlice(struct clientState* clst, struct slice* slice, int sliceNo)
{
  idbassert(slice->state == slice::SLICE_FREE || slice->state == slice::SLICE_RECEIVING);

  slice->magic = SLICEMAGIC;
  slice->state = slice::SLICE_RECEIVING;
  slice->blocksTransferred = 0;
  slice->dataBlocksTransferred = 0;
  BZERO(slice->retransmit);
  slice->freePos = 0;
  slice->bytes = 0;

  if (clst->currentSlice != NULL && !clst->currentSlice->bytesKnown)
  {
    // udpc_fatal(1, "Previous slice size not known\n");
  }

  if (clst->currentSliceNo != sliceNo - 1)
  {
    // udpc_fatal(1, "Slice no mismatch %d <-> %d\n", sliceNo, clst->currentSliceNo);
  }

  slice->bytesKnown = 0;
  slice->sliceNo = sliceNo;

  BZERO(slice->missing_data_blocks);
#ifdef BB_FEATURE_UDPCAST_FEC
  BZERO(slice->fec_stripes);
  BZERO(slice->fec_blocks);
  BZERO(slice->fec_descs);
#endif
  clst->currentSlice = slice;
  clst->currentSliceNo = sliceNo;
  return slice;
}
struct slice* newSlice(struct clientState* clst, int sliceNo)
{
  struct slice* slice = NULL;
  int i;

  pc_consume(clst->free_slices_pc, 1);
  i = pc_getConsumerPosition(clst->free_slices_pc);
  pc_consumed(clst->free_slices_pc, 1);
  slice = &clst->slices[i];
  idbassert(slice->state == slice::SLICE_FREE);

  /* wait for free data memory */
  slice->base = pc_getConsumerPosition(clst->fifo->freeMemQueue);
  pc_consume(clst->fifo->freeMemQueue, clst->net_config->blockSize * MAX_SLICE_SIZE);
  initSlice(clst, slice, sliceNo);
  return slice;
}

#define ADR(x, bs) (fifo->dataBuffer + (slice->base + (x)*bs) % fifo->dataBufSize)

void closeAllExcept(struct clientState* clst, int fd)
{
  int i;
  int* socks = clst->client_config->socks;

  if (clst->selectedFd >= 0)
    return;

  clst->selectedFd = fd;

  for (i = 1; i < NR_CLIENT_SOCKS; i++)
    if (socks[i] != -1 && socks[i] != fd)
      udpc_closeSock(socks, NR_CLIENT_SOCKS, i);
}

struct slice* rcvFindSlice(struct clientState* clst, int sliceNo)
{
  if (!clst->currentSlice)
  {
    /* Streaming mode? */
    clst->currentSliceNo = sliceNo - 1;
    return newSlice(clst, sliceNo);
  }

  if (sliceNo <= clst->currentSliceNo)
  {
    struct slice* slice = clst->currentSlice;
    int pos = slice - clst->slices;
    idbassert(slice == NULL || slice->magic == SLICEMAGIC);

    while (slice->sliceNo != sliceNo)
    {
      if (slice->state == slice::SLICE_FREE)
        return NULL;

      idbassert(slice->magic == SLICEMAGIC);
      pos--;

      if (pos < 0)
        pos += NR_SLICES;

      slice = &clst->slices[pos];
    }

    return slice;
  }

  if (sliceNo != clst->currentSliceNo + 1)
  {
    // TODO: FIXME
    // udpc_flprintf("Slice %d skipped\n", sliceNo-1);
    // exit(1);
  }

  if ((clst->net_config->flags & FLAG_STREAMING) && sliceNo != clst->currentSliceNo)
  {
    return initSlice(clst, clst->currentSlice, sliceNo);
  }

  if (sliceNo > clst->receivedSliceNo + 2)
  {
    // TODO: FIXME
#if 0
        slice_t slice = rcvFindSlice(clst, clst->receivedSliceNo + 1);
        udpc_flprintf("Dropped by server now=%d last=%d\n", sliceNo, clst->receivedSliceNo);

        if (slice != NULL)
            printMissedBlockMap(clst, slice);

        exit(1);
#endif
  }

  return newSlice(clst, sliceNo);
}

void setSliceBytes(struct slice* slice, struct clientState* clst, int bytes)
{
  idbassert(slice->magic == SLICEMAGIC);

  if (slice->bytesKnown)
  {
    if (slice->bytes != bytes)
    {
      // TODO: FIXME
      // udpc_fatal(1, "Byte number mismatch %d <-> %d\n", bytes, slice->bytes);
    }
  }
  else
  {
    slice->bytesKnown = 1;
    slice->bytes = bytes;

    if (bytes == 0)
      clst->netEndReached = 1;
  }
}

void advanceReceivedPointer(struct clientState* clst)
{
  int pos = clst->receivedPtr;

  while (1)
  {
    slice_t slice = &clst->slices[pos];

    if (
#ifdef BB_FEATURE_UDPCAST_FEC
        slice->state != SLICE_FEC && slice->state != SLICE_FEC_DONE &&
#endif
        slice->state != slice::SLICE_DONE)
      break;

    pos++;
    clst->receivedSliceNo = slice->sliceNo;

    if (pos >= NR_SLICES)
      pos -= NR_SLICES;
  }

  clst->receivedPtr = pos;
}

void receiverStatsAddBytes(receiver_stats_t rs, long bytes)
{
  if (rs != NULL)
    rs->totalBytes += bytes;
}

void cleanupSlices(struct clientState* clst, unsigned int doneState)
{
  while (1)
  {
    int pos = pc_getProducerPosition(clst->free_slices_pc);
    int bytes;
    slice_t slice = &clst->slices[pos];

    if (slice->state != (signed)doneState)
      break;

    receiverStatsAddBytes(clst->stats, slice->bytes);
    bytes = slice->bytes;

    /* signal data received */
    if (bytes == 0)
    {
      pc_produceEnd(clst->fifo->data);
    }
    else
      pc_produce(clst->fifo->data, slice->bytes);

    /* free up slice structure */
    clst->slices[pos].state = slice::SLICE_FREE;
    pc_produce(clst->free_slices_pc, 1);

    /* if at end, exit this thread */
    if (!bytes)
    {
      clst->endReached = 2;
    }
  }
}

void checkSliceComplete(struct clientState* clst, struct slice* slice)
{
  int blocksInSlice;

  idbassert(slice->magic == SLICEMAGIC);

  if (slice->state != slice::SLICE_RECEIVING)
    /* bad starting state */
    return;

  /* is this slice ready ? */
  idbassert(clst->net_config->blockSize != 0);
  blocksInSlice = (slice->bytes + clst->net_config->blockSize - 1) / clst->net_config->blockSize;

  if (blocksInSlice == slice->blocksTransferred)
  {
    pc_consumed(clst->fifo->freeMemQueue, slice->bytes);
    clst->net_config->flags &= ~FLAG_STREAMING;

    if (blocksInSlice == slice->dataBlocksTransferred)
      slice->state = slice::SLICE_DONE;
    else
    {
#ifdef BB_FEATURE_UDPCAST_FEC
      idbassert(clst->use_fec == 1);
      slice->state = SLICE_FEC;
#else
      idbassert(0);
#endif
    }

    advanceReceivedPointer(clst);
#ifdef BB_FEATURE_UDPCAST_FEC

    if (clst->use_fec)
    {
      int n = pc_getProducerPosition(clst->fec_data_pc);
      idbassert(slice->state == SLICE_DONE || slice->state == SLICE_FEC);
      clst->fec_slices[n] = slice;
      pc_produce(clst->fec_data_pc, 1);
    }
    else
#endif
      cleanupSlices(clst, slice::SLICE_DONE);
  }
}

int processDataBlock(struct clientState* clst, int sliceNo, int blockNo, int bytes)
{
  // cerr << "processDataBlock(): " << sliceNo << ", " << blockNo << ", " << bytes << endl;
  struct fifo* fifo = clst->fifo;
  struct slice* slice = rcvFindSlice(clst, sliceNo);
  unsigned char *shouldAddress, *isAddress;

  idbassert(slice == NULL || slice->magic == SLICEMAGIC);

  if (slice == NULL || slice->state == slice::SLICE_FREE || slice->state == slice::SLICE_DONE
#ifdef BB_FEATURE_UDPCAST_FEC
      || slice->state == SLICE_FEC || slice->state == SLICE_FEC_DONE
#endif
  )
  {
    /* an old slice. Ignore */
    // cerr << "ignore" << endl;
    return 0;
  }

  if (sliceNo > clst->currentSliceNo + 2)
  {
    // TODO: FIXME
    // udpc_fatal(1, "We have been dropped by sender\n");
  }

  if (BIT_ISSET(blockNo, slice->retransmit.map))
  {
    /* we already have this packet, ignore */
#if 0
        flprintf("Packet %d:%d not for us\n", sliceNo, blockNo);
#endif
    // cerr << "dup" << endl;
    return 0;
  }

  if (slice->base % clst->net_config->blockSize)
  {
    // TODO: FIXME
    // udpc_fatal(1, "Bad base %d, not multiple of block size %d\n", slice->base,
    // clst->net_config->blockSize);
    // cerr << "bad base" << endl;
  }

  // cerr << "good slice" << endl;

  shouldAddress = ADR(blockNo, clst->net_config->blockSize);
  isAddress = (unsigned char*)clst->data_hdr.msg_iov[1].iov_base;

  if (shouldAddress != isAddress)
  {
    /* copy message to the correct place */
    memcpy(shouldAddress, isAddress, clst->net_config->blockSize);
  }

  if (clst->client_config->sender_is_newgen && bytes != 0)
    setSliceBytes(slice, clst, bytes);

  if (clst->client_config->sender_is_newgen && bytes == 0)
    clst->netEndReached = 0;

  SET_BIT(blockNo, slice->retransmit.map);
#ifdef BB_FEATURE_UDPCAST_FEC

  if (slice->fec_stripes)
  {
    int stripe = blockNo % slice->fec_stripes;
    slice->missing_data_blocks[stripe]--;
    idbassert(slice->missing_data_blocks[stripe] >= 0);

    if (slice->missing_data_blocks[stripe] < slice->fec_blocks[stripe])
    {
      int blockIdx;
      /* FIXME: FEC block should be enqueued in local queue here...*/
      slice->fec_blocks[stripe]--;
      blockIdx = stripe + slice->fec_blocks[stripe] * slice->fec_stripes;
      idbassert(slice->fec_descs[blockIdx].adr != 0);
      clst->localBlockAddresses[clst->localPos++] = slice->fec_descs[blockIdx].adr;
      slice->fec_descs[blockIdx].adr = 0;
      slice->blocksTransferred--;
    }
  }

#endif
  slice->dataBlocksTransferred++;
  slice->blocksTransferred++;

  while (slice->freePos < MAX_SLICE_SIZE && BIT_ISSET(slice->freePos, slice->retransmit.map))
    slice->freePos++;

  checkSliceComplete(clst, slice);
  return 0;
}

int sendOk(struct client_config* client_config, unsigned int sliceNo)
{
  struct ok ok;
  ok.opCode = htons(CMD_OK);
  ok.reserved = 0;
  ok.sliceNo = htonl(sliceNo);
  return SSEND(ok);
}

int sendRetransmit(struct clientState* clst, struct slice* slice, int rxmit)
{
  struct client_config* client_config = clst->client_config;

  idbassert(slice->magic == SLICEMAGIC);
  slice->retransmit.opCode = htons(CMD_RETRANSMIT);
  slice->retransmit.reserved = 0;
  slice->retransmit.sliceNo = htonl(slice->sliceNo);
  slice->retransmit.rxmit = htonl(rxmit);
  return SSEND(slice->retransmit);
}

int processReqAck(struct clientState* clst, int sliceNo, int bytes, int rxmit)
{
  struct slice* slice = rcvFindSlice(clst, sliceNo);
  int blocksInSlice;
  char* readySet = (char*)clst->data_hdr.msg_iov[1].iov_base;

  idbassert(slice == NULL || slice->magic == SLICEMAGIC);

  {
    struct timeval tv;
    gettimeofday(&tv, 0);
    /* usleep(1); DEBUG: FIXME */
  }

  if (BIT_ISSET(clst->client_config->clientNumber, readySet))
  {
    /* not for us */
    return 0;
  }

  if (slice == NULL)
  {
    /* an old slice => send ok */
    return sendOk(clst->client_config, sliceNo);
  }

  setSliceBytes(slice, clst, bytes);
  idbassert(clst->net_config->blockSize != 0);
  blocksInSlice = (slice->bytes + clst->net_config->blockSize - 1) / clst->net_config->blockSize;

  if (blocksInSlice == slice->blocksTransferred)
  {
    /* send ok */
    sendOk(clst->client_config, slice->sliceNo);
  }
  else
  {
    sendRetransmit(clst, slice, rxmit);
  }

  checkSliceComplete(clst, slice); /* needed for the final 0 sized slice */
  advanceReceivedPointer(clst);
#ifdef BB_FEATURE_UDPCAST_FEC

  if (!clst->use_fec)
    cleanupSlices(clst, SLICE_DONE);

#endif
  return 0;
}

int udpc_isAddressEqual(struct sockaddr_in* a, struct sockaddr_in* b)
{
  return !memcmp((char*)a, (char*)b, 8);
}
int dispatchMessage(struct clientState* clst)
{
  int ret;
  struct sockaddr_in lserver;
  struct fifo* fifo = clst->fifo;
  int fd = -1;
  struct client_config* client_config = clst->client_config;

  /* set up message header */
  if (clst->currentSlice != NULL && clst->currentSlice->freePos < MAX_SLICE_SIZE)
  {
    struct slice* slice = clst->currentSlice;
    idbassert(slice == NULL || slice->magic == SLICEMAGIC);
    clst->data_iov[1].iov_base = ADR(slice->freePos, clst->net_config->blockSize);
  }
  else
  {
    clst->data_iov[1].iov_base = clst->nextBlock;
  }

  clst->data_iov[1].iov_len = clst->net_config->blockSize;
  clst->data_hdr.msg_iovlen = 2;

  clst->data_hdr.msg_name = &lserver;
  clst->data_hdr.msg_namelen = sizeof(struct sockaddr_in);

  while (clst->endReached || clst->netEndReached)
  {
    int oldEndReached = clst->endReached;
    int nr_desc;
    struct timeval tv;
    fd_set read_set;

    int maxFd = prepareForSelect(client_config->socks, NR_CLIENT_SOCKS, &read_set);

    tv.tv_sec = clst->net_config->exitWait / 1000;
    tv.tv_usec = (clst->net_config->exitWait % 1000) * 1000;
    nr_desc = select(maxFd, &read_set, 0, 0, &tv);

    if (nr_desc < 0)
    {
      break;
    }

    fd = udpc_getSelectedSock(client_config->socks, NR_CLIENT_SOCKS, &read_set);

    if (fd >= 0)
      break;

    /* Timeout expired */
    if (oldEndReached >= 2)
    {
      clst->endReached = 3;
      return 0;
    }
  }

  if (fd < 0)
    fd = clst->selectedFd;

  if (fd < 0)
  {
    struct timeval tv, *tvp;
    fd_set read_set;
    int maxFd = prepareForSelect(client_config->socks, NR_CLIENT_SOCKS, &read_set);
    clst->promptPrinted = 1;

    if (clst->net_config->startTimeout == 0)
    {
      tvp = NULL;
    }
    else
    {
      tv.tv_sec = clst->net_config->startTimeout;
      tv.tv_usec = 0;
      tvp = &tv;
    }

    // cerr << "waiting for data..." << endl;
    ret = selectWithoutConsole(maxFd + 1, &read_set, tvp);

    if (ret < 0)
    {
      perror("Select");
      return 0;
    }

    if (ret == 0)
    {
      clst->endReached = 3;
      clst->netEndReached = 3;
      pc_produceEnd(clst->fifo->data);
      return 1;
    }

    fd = udpc_getSelectedSock(clst->client_config->socks, NR_CLIENT_SOCKS, &read_set);
  }

#ifdef LOSSTEST
  loseRecvPacket(fd);
  ret = RecvMsg(fd, &clst->data_hdr, 0);
#else
  ret = recvmsg(fd, &clst->data_hdr, 0);
// cerr << "got some data on fd " << fd << endl;
#endif

  if (ret < 0)
  {
#if DEBUG
    flprintf("data recvfrom %d: %s\n", fd, strerror(errno));
#endif
    return -1;
  }

#if 0
    fprintf(stderr, "received packet for slice %d, block %d\n",
            ntohl(Msg.sliceNo), ntohs(db.blockNo));
#endif

  if (!udpc_isAddressEqual(&lserver, &clst->client_config->serverAddr))
  {
    return -1;
  }

  switch (ntohs(clst->Msg.opCode))
  {
    case CMD_DATA:
      // cerr << "got CMD_DATA" << endl;
      closeAllExcept(clst, fd);
      // udpc_receiverStatsStartTimer(clst->stats);
      clst->client_config->isStarted = 1;
      return processDataBlock(clst, ntohl(clst->Msg.dataBlock.sliceNo), ntohs(clst->Msg.dataBlock.blockNo),
                              ntohl(clst->Msg.dataBlock.bytes));
#ifdef BB_FEATURE_UDPCAST_FEC

    case CMD_FEC:
      // cerr << "got CMD_FEC" << endl;
      closeAllExcept(clst, fd);
      // receiverStatsStartTimer(clst->stats);
      clst->client_config->isStarted = 1;
      return processFecBlock(clst, ntohs(clst->Msg.fecBlock.stripes), ntohl(clst->Msg.fecBlock.sliceNo),
                             ntohs(clst->Msg.fecBlock.blockNo), ntohl(clst->Msg.fecBlock.bytes));
#endif

    case CMD_REQACK:
      // cerr << "got CMD_REQACK" << endl;
      closeAllExcept(clst, fd);
      // receiverStatsStartTimer(clst->stats);
      clst->client_config->isStarted = 1;
      return processReqAck(clst, ntohl(clst->Msg.reqack.sliceNo), ntohl(clst->Msg.reqack.bytes),
                           ntohl(clst->Msg.reqack.rxmit));

    case CMD_HELLO_STREAMING:
    case CMD_HELLO_NEW:
    case CMD_HELLO:
      // cerr << "got CMD_HELLO" << endl;
      /* retransmission of hello to find other participants ==> ignore */
      return 0;

    default: break;
  }

  return -1;
}

THREAD_RETURN netReceiverMain(void* args0)
{
  struct clientState* clst = (struct clientState*)args0;

  clst->currentSliceNo = 0;
  setupMessages(clst);

  clst->currentSliceNo = -1;
  clst->currentSlice = NULL;
  clst->promptPrinted = 0;

  if (!(clst->net_config->flags & FLAG_STREAMING))
    newSlice(clst, 0);
  else
  {
    clst->currentSlice = NULL;
    clst->currentSliceNo = 0;
  }

  while (clst->endReached < 3)
  {
    dispatchMessage(clst);
  }

#ifdef BB_FEATURE_UDPCAST_FEC

  if (clst->use_fec)
    pthread_join(clst->fec_thread, NULL);

#endif
  return 0;
}

int spawnNetReceiver(struct fifo* fifo, struct client_config* client_config, struct net_config* net_config)
{
  int i;
  struct clientState* clst = MALLOC(struct clientState);
  clst->fifo = fifo;
  clst->client_config = client_config;
  clst->net_config = net_config;
  // clst->stats = stats;
  clst->endReached = 0;
  clst->netEndReached = 0;
  clst->selectedFd = -1;

  clst->free_slices_pc = pc_makeProduconsum(NR_SLICES, "free slices");
  pc_produce(clst->free_slices_pc, NR_SLICES);

  for (i = 0; i < NR_SLICES; i++)
    clst->slices[i].state = slice::SLICE_FREE;

  clst->receivedPtr = 0;
  clst->receivedSliceNo = 0;

#ifdef BB_FEATURE_UDPCAST_FEC
  fec_init(); /* fec new involves memory
               * allocation. Better do it here */
  clst->use_fec = 0;
  clst->fec_data_pc = pc_makeProduconsum(NR_SLICES, "fec data");
#endif

#define NR_BLOCKS 4096
  clst->freeBlocks_pc = pc_makeProduconsum(NR_BLOCKS, "free blocks");
  pc_produce(clst->freeBlocks_pc, NR_BLOCKS);
  clst->blockAddresses = (unsigned char**)calloc(NR_BLOCKS, sizeof(char*));
  clst->localBlockAddresses = (unsigned char**)calloc(NR_BLOCKS, sizeof(char*));
  clst->blockData = (unsigned char*)malloc(NR_BLOCKS * net_config->blockSize);

  for (i = 0; i < NR_BLOCKS; i++)
    clst->blockAddresses[i] = clst->blockData + i * net_config->blockSize;

  clst->localPos = 0;

  setNextBlock(clst);
  return pthread_create(&client_config->thread, NULL, netReceiverMain, clst);
}

unsigned int pc_getSize(produconsum_t pc)
{
  return pc->size;
}

int writer(struct fifo* fifo, SBS outbs)
{
  // cerr << "start appending to outbs" << endl;
  outbs->restart();
  int fifoSize = pc_getSize(fifo->data);

  while (1)
  {
    int pos = pc_getConsumerPosition(fifo->data);
    int bytes = pc_consumeContiguousMinAmount(fifo->data, BLOCKSIZE);

    if (bytes == 0)
    {
      // cerr << "done appending to outbs: " << outbs->length() << endl;
      return 0;
    }

    /*
     * If we have more than blocksize, round down to nearest blocksize
     * multiple
     */
    if (pos + bytes != fifoSize && bytes > (pos + bytes) % BLOCKSIZE)
      bytes -= (pos + bytes) % BLOCKSIZE;

    /* make sure we don't write to big a chunk... Better to
     * liberate small chunks one by one rather than attempt to
     * write out a bigger chunk and block reception for too
     * long */
    if (bytes > 128 * 1024)
      bytes = 64 * 1024;

    // bytes = write(outFile, fifo->dataBuffer + pos, bytes);
    // cerr << "appending " << bytes << " bytes to outbs..." << endl;
    outbs->append(fifo->dataBuffer + pos, bytes);
    pc_consumed(fifo->data, bytes);
    pc_produce(fifo->freeMemQueue, bytes);
  }
}
}  // namespace

namespace multicast
{
MulticastImpl::MulticastImpl(int min_receivers, const string& ifName, int portBase, int bufSize)
 : fIfName(ifName), fDb(0)
{
  udpc_clearIp(&fNet_config.dataMcastAddr);
  fNet_config.mcastRdv = 0;
  fNet_config.blockSize = 1024;  // 1456;
  fNet_config.sliceSize = 16;
  fNet_config.portBase = portBase;
  fNet_config.nrGovernors = 0;
  fNet_config.flags = FLAG_NOKBD;
  fNet_config.capabilities = 0;
  fNet_config.min_slice_size = 16;
  fNet_config.max_slice_size = 1024;
  fNet_config.default_slice_size = 0;
  fNet_config.ttl = 1;
  fNet_config.rexmit_hello_interval = 2000;
  fNet_config.autostart = 0;
  fNet_config.requestedBufSize = bufSize;

  fNet_config.min_receivers = min_receivers;
  fNet_config.max_receivers_wait = 0;
  fNet_config.min_receivers_wait = 0;

  fNet_config.retriesUntilDrop = 200;

  fNet_config.rehelloOffset = 50;

  fStat_config.log = 0;
  fStat_config.bwPeriod = 0;
  fStat_config.printUncompressedPos = -1;
  fStat_config.statPeriod = DEFLT_STAT_PERIOD;

  fNet_config.net_if = 0;

  // full-duplex
  fNet_config.flags |= FLAG_SN;

  if (fIfName.empty())
    fIfName = "eth0";

  fSock[0] = -1;
  fSock[1] = -1;
  fSock[2] = -1;
}

MulticastImpl::~MulticastImpl()
{
  delete fDb;

  for (int i = 0; i < 3; i++)
  {
    if (fSock[i] >= 0)
    {
      shutdown(fSock[i], SHUT_RDWR);
      close(fSock[i]);
    }
  }
}

void MulticastImpl::startSender()
{
  int tries;
  time_t firstConnected = 0;
  time_t* firstConnectedP;

  // participantsDb_t db;

  /* make the socket and print banner */
  // int fSock[3];
  int nr = 0;
  int fd;
  int r;
  int j;

  fNet_config.net_if = udpc_getNetIf(fIfName.c_str());

  fSock[nr++] = udpc_makeSocket(ADDR_TYPE_UCAST, fNet_config.net_if, NULL, SENDER_PORT(fNet_config.portBase));
  // cerr << "sock[" << (nr-1) << "] = " << fSock[(nr-1)] << endl;

  if (!(fNet_config.flags & (FLAG_SN | FLAG_NOTSN)))
  {
    if (udpc_isFullDuplex(fSock[0], fNet_config.net_if->name) == 1)
    {
      fNet_config.flags |= FLAG_SN;
    }
  }

  fd = udpc_makeSocket(ADDR_TYPE_BCAST, fNet_config.net_if, NULL, SENDER_PORT(fNet_config.portBase));

  if (fd >= 0)
    fSock[nr++] = fd;

  // cerr << "sock[" << (nr-1) << "] = " << fSock[(nr-1)] << endl;

  if (fNet_config.requestedBufSize)
    udpc_setSendBuf(fSock[0], fNet_config.requestedBufSize);

  fNet_config.controlMcastAddr.sin_addr.s_addr = 0;

  if (fNet_config.ttl == 1 && fNet_config.mcastRdv == NULL)
  {
    udpc_getBroadCastAddress(fNet_config.net_if, &fNet_config.controlMcastAddr,
                             RECEIVER_PORT(fNet_config.portBase));
    udpc_setSocketToBroadcast(fSock[0]);
  }

  if (fNet_config.controlMcastAddr.sin_addr.s_addr == 0)
  {
    udpc_getMcastAllAddress(&fNet_config.controlMcastAddr, fNet_config.mcastRdv,
                            RECEIVER_PORT(fNet_config.portBase));

    /* Only do the following if controlMcastAddr is indeed an
       mcast address ... */
    if (isMcastAddress(&fNet_config.controlMcastAddr))
    {
      udpc_setMcastDestination(fSock[0], fNet_config.net_if, &fNet_config.controlMcastAddr);
      udpc_setTtl(fSock[0], fNet_config.ttl);
      fSock[nr++] = udpc_makeSocket(ADDR_TYPE_MCAST, fNet_config.net_if, &fNet_config.controlMcastAddr,
                                    SENDER_PORT(fNet_config.portBase));
      // cerr << "sock[" << (nr-1) << "] = " << fSock[(nr-1)] << endl;
    }
  }

  if (!(fNet_config.flags & FLAG_POINTOPOINT) && udpc_ipIsZero(&fNet_config.dataMcastAddr))
  {
    udpc_getDefaultMcastAddress(fNet_config.net_if, &fNet_config.dataMcastAddr);
  }

  if (fNet_config.flags & FLAG_POINTOPOINT)
  {
    udpc_clearIp(&fNet_config.dataMcastAddr);
  }

  setPort(&fNet_config.dataMcastAddr, RECEIVER_PORT(fNet_config.portBase));

  fNet_config.capabilities = SENDER_CAPABILITIES;

  if (fNet_config.flags & FLAG_ASYNC)
    fNet_config.capabilities |= CAP_ASYNC;

  udpc_sendHello(&fNet_config, fSock[0], 0);

  fDb = udpc_makeParticipantsDb();
  tries = 0;

  if (fNet_config.min_receivers || fNet_config.min_receivers_wait || fNet_config.max_receivers_wait)
    firstConnectedP = &firstConnected;
  else
    firstConnectedP = NULL;

  while (!(r = mainDispatcher(fSock, nr, fDb, &fNet_config, &tries, firstConnectedP)))
    ;

  for (j = 1; j < nr; j++)
    if (fSock[j] != fSock[0])
      closesocket(fSock[j]);

  if (r == 1)
  {
    int i;

    for (i = 1; i < nr; i++)
      udpc_closeSock(fSock, nr, i);
  }

  // doTransfer(fSock[0], fDb, &fNet_config, &fStat_config);
}

void MulticastImpl::doTransfer(const uint8_t* buf, uint32_t len)
{
  int i;
  int ret;
  struct fifo fifo;
  int isPtP = isPointToPoint(fDb, fNet_config.flags);

  fNet_config.rcvbuf = 0;

  for (i = 0; i < MAX_CLIENTS; i++)
    if (udpc_isParticipantValid(fDb, i))
    {
      unsigned int pRcvBuf = udpc_getParticipantRcvBuf(fDb, i);

      if (isPtP)
        udpc_copyIpFrom(&fNet_config.dataMcastAddr, udpc_getParticipantIp(fDb, i));

      fNet_config.capabilities &= udpc_getParticipantCapabilities(fDb, i);

      if (pRcvBuf != 0 && (fNet_config.rcvbuf == 0 || fNet_config.rcvbuf > pRcvBuf))
        fNet_config.rcvbuf = pRcvBuf;
    }

  if (isMcastAddress(&fNet_config.dataMcastAddr))
    udpc_setMcastDestination(fSock[0], fNet_config.net_if, &fNet_config.dataMcastAddr);

  if (!(fNet_config.capabilities & CAP_BIG_ENDIAN))
  {
    // TODO: FIXME
    // udpc_fatal(1, "Peer with incompatible endianness");
  }

  if (!(fNet_config.capabilities & CAP_NEW_GEN))
  {
    fNet_config.dataMcastAddr = fNet_config.controlMcastAddr;
    fNet_config.flags &= ~(FLAG_SN | FLAG_ASYNC);
  }

  if (fNet_config.flags & FLAG_BCAST)
    fNet_config.dataMcastAddr = fNet_config.controlMcastAddr;

  udpc_initFifo(&fifo, fNet_config.blockSize);
  ret = spawnNetSender(&fifo, fSock[0], &fNet_config, fDb);
  localReader(&fifo, buf, len);

  pthread_join(fifo.thread, NULL);
}

void MulticastImpl::startReceiver()
{
  union serverControlMsg Msg;
  int connectReqSent = 0;
  struct sockaddr_in myIp;
  int haveServerAddress;

  fClient_config.sender_is_newgen = 0;

  fNet_config.net_if = udpc_getNetIf(fIfName.c_str());
  {
    fprintf(stderr, "net_if:\n\taddr = 0x%x\n\tbcast = 0x%x\n\tname = %s\n\tindex = %d\n",
            fNet_config.net_if->addr.s_addr, fNet_config.net_if->bcast.s_addr, fNet_config.net_if->name,
            fNet_config.net_if->index);
  }

  udpc_zeroSockArray(fClient_config.socks, NR_CLIENT_SOCKS);

  fClient_config.S_UCAST =
      udpc_makeSocket(ADDR_TYPE_UCAST, fNet_config.net_if, 0, RECEIVER_PORT(fNet_config.portBase));
  // cerr << "S_UCAST = " << fClient_config.S_UCAST << endl;
  fClient_config.S_BCAST =
      udpc_makeSocket(ADDR_TYPE_BCAST, fNet_config.net_if, 0, RECEIVER_PORT(fNet_config.portBase));
  // cerr << "S_BCAST = " << fClient_config.S_BCAST << endl;

  if (fNet_config.ttl == 1 && fNet_config.mcastRdv == NULL)
  {
    udpc_getBroadCastAddress(fNet_config.net_if, &fNet_config.controlMcastAddr,
                             SENDER_PORT(fNet_config.portBase));
    udpc_setSocketToBroadcast(fClient_config.S_UCAST);
  }
  else
  {
    udpc_getMcastAllAddress(&fNet_config.controlMcastAddr, fNet_config.mcastRdv,
                            SENDER_PORT(fNet_config.portBase));

    if (isMcastAddress(&fNet_config.controlMcastAddr))
    {
      udpc_setMcastDestination(fClient_config.S_UCAST, fNet_config.net_if, &fNet_config.controlMcastAddr);
      udpc_setTtl(fClient_config.S_UCAST, fNet_config.ttl);

      fClient_config.S_MCAST_CTRL =
          udpc_makeSocket(ADDR_TYPE_MCAST, fNet_config.net_if, &fNet_config.controlMcastAddr,
                          RECEIVER_PORT(fNet_config.portBase));
      // cerr << "S_MCAST_CTRL = " << fClient_config.S_MCAST_CTRL << endl;
      // TODO: subscribe address as receiver to!
    }
  }

  udpc_clearIp(&fNet_config.dataMcastAddr);

  connectReqSent = 0;
  haveServerAddress = 0;

  fClient_config.clientNumber = 0; /*default number for asynchronous transfer*/

  while (1)
  {
    // int len;
    int msglen;
    int sock;

    if (!connectReqSent)
    {
      if (sendConnectReq(&fClient_config, &fNet_config, haveServerAddress) < 0)
      {
        // TODO: FIXME
        // perror("sendto to locate server");
      }

      connectReqSent = 1;
    }

    haveServerAddress = 0;

    // cerr << "waiting for msg..." << flush << endl;
    sock = udpc_selectSock(fClient_config.socks, NR_CLIENT_SOCKS, fNet_config.startTimeout);

    // cerr << "got something" << endl;
    if (sock < 0)
    {
      // TODO: FIXME
    }

    // len = sizeof(server);
    msglen = RECV(sock, Msg, fClient_config.serverAddr, fNet_config.portBase);

    if (msglen < 0)
    {
      // TODO: FIXME
      // perror("recvfrom to locate server");
      // exit(1);
    }

    if (udpc_getPort(&fClient_config.serverAddr) != SENDER_PORT(fNet_config.portBase))
      /* not from the right port */
      continue;

    switch (ntohs(Msg.opCode))
    {
      case CMD_CONNECT_REPLY:
        // cerr << "got conrep" << endl;
        fClient_config.clientNumber = ntohl(Msg.connectReply.clNr);
        fNet_config.blockSize = ntohl(Msg.connectReply.blockSize);

        if (ntohl(Msg.connectReply.capabilities) & CAP_NEW_GEN)
        {
          fClient_config.sender_is_newgen = 1;
          udpc_copyFromMessage(&fNet_config.dataMcastAddr, Msg.connectReply.mcastAddr);
        }

        if (fClient_config.clientNumber == -1)
        {
          // TODO: FIXME
          // udpc_fatal(1, "Too many clients already connected\n");
        }

        goto break_loop;

      case CMD_HELLO_STREAMING:
      case CMD_HELLO_NEW:
      case CMD_HELLO:
        // cerr << "got hello" << endl;
        connectReqSent = 0;

        if (ntohs(Msg.opCode) == CMD_HELLO_STREAMING)
          fNet_config.flags |= FLAG_STREAMING;

        if (ntohl(Msg.hello.capabilities) & CAP_NEW_GEN)
        {
          fClient_config.sender_is_newgen = 1;
          udpc_copyFromMessage(&fNet_config.dataMcastAddr, Msg.hello.mcastAddr);
          fNet_config.blockSize = ntohs(Msg.hello.blockSize);

          if (ntohl(Msg.hello.capabilities) & CAP_ASYNC)
            fNet_config.flags |= FLAG_PASSIVE;

          if (fNet_config.flags & FLAG_PASSIVE)
            goto break_loop;
        }

        haveServerAddress = 1;
        continue;

      case CMD_CONNECT_REQ:
      case CMD_DATA:
      case CMD_FEC: continue;

      default: break;
    }

    // TODO: FIXME
    // udpc_fatal(1, "Bad server reply %04x. Other transfer in progress?\n", (unsigned short)
    // ntohs(Msg.opCode));
  }

break_loop:

  udpc_getMyAddress(fNet_config.net_if, &myIp);

  if (!udpc_ipIsZero(&fNet_config.dataMcastAddr) && !udpc_ipIsEqual(&fNet_config.dataMcastAddr, &myIp) &&
      (udpc_ipIsZero(&fNet_config.controlMcastAddr) ||
       !udpc_ipIsEqual(&fNet_config.dataMcastAddr, &fNet_config.controlMcastAddr)))
  {
    fClient_config.S_MCAST_DATA = udpc_makeSocket(
        ADDR_TYPE_MCAST, fNet_config.net_if, &fNet_config.dataMcastAddr, RECEIVER_PORT(fNet_config.portBase));
    // cerr << "S_MCAST_DATA = " << fClient_config.S_MCAST_DATA << endl;
  }

  if (fNet_config.requestedBufSize)
  {
    int i;

    for (i = 0; i < NR_CLIENT_SOCKS; i++)
      if (fClient_config.socks[i] != -1)
        udpc_setRcvBuf(fClient_config.socks[i], fNet_config.requestedBufSize);
  }
}

void MulticastImpl::receive(SBS outbs)
{
  struct fifo fifo;

  udpc_initFifo(&fifo, fNet_config.blockSize);
  fifo.data = pc_makeProduconsum(fifo.dataBufSize, "receive");
  fClient_config.isStarted = 0;

  spawnNetReceiver(&fifo, &fClient_config, &fNet_config);
  writer(&fifo, outbs);

  pthread_join(fClient_config.thread, NULL);
}

}  // namespace multicast
